Unable to get CallableThread in RejectionHandler

615 views Asked by At

I have thread pool, which will take Callable worker thread with a RejectionHandler. I need to get this Callable task in RejectionHandler but unable to get it.

In this below example, I need uniqueId of Callable task for which RejectionHandler executed. In RejecitonHandler, the Runnable is casted FutureTask where I expect that it should be casted to Callable worker thread.

Please help me in getting Callable Worker thread instance in RejectionHandler.

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RejectionDemo {
    RejectionDemo(){
        Random random = new Random();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), new RejectionHandlerImpl());
        CallableWorkerThread workers[] = 
                new CallableWorkerThread[10];
        for (int i=0; i< workers.length; i++){
            workers[i] = new CallableWorkerThread(random.nextInt(100));
            FutureTask<Integer> task = new FutureTask<Integer>(workers[i]);
            executor.submit(task);
        }
    }

    public static void main(String args[]){
        RejectionDemo demo = new RejectionDemo();
    }
    public class CallableWorkerThread implements
        Callable<Integer> {
        private int uniqueId;

        CallableWorkerThread(int uniqueId) {
            this.uniqueId = uniqueId;
        }

        public Integer call() {
            System.out.println("Unique id="+uniqueId);
            return uniqueId;
        }
        public String toString(){
            return ""+uniqueId;
        }
    }
    class RejectionHandlerImpl implements RejectedExecutionHandler{
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try{
                System.out.println(r);
            }catch(Throwable t){
                t.printStackTrace();
            }
        }
    }
}

Output

java.util.concurrent.FutureTask@70036428
Unique id=68
java.util.concurrent.FutureTask@6ea4b78b
java.util.concurrent.FutureTask@e3f6d
java.util.concurrent.FutureTask@1ce84763
java.util.concurrent.FutureTask@55a6c368
java.util.concurrent.FutureTask@4e77b794
java.util.concurrent.FutureTask@15b57dcb
Unique id=55
Unique id=83

I am expecting CallableWorkerThread instead of FutureTask. Help me in getting WorkerThread instance.

1

There are 1 answers

3
Holger On BEST ANSWER

In your code

workers[i] = new CallableWorkerThread(random.nextInt(100));
FutureTask<Integer> task = new FutureTask<Integer>(workers[i]);
executor.submit(task);

you create a FutureTask which wraps the CallableWorkerThread instance but then you are using submit which accepts an arbitrary Runnable and returns a FutureTask which wraps the Runnable.

In other words, you are wrapping your FutureTask in another FutureTask. There are two ways to solve this

  1. Use

    workers[i] = new CallableWorkerThread(random.nextInt(100));
    executor.submit(workers[i]);
    

    to let the ExecutorService wrap your Callable inside a FutureTask.

  2. Use

    workers[i] = new CallableWorkerThread(random.nextInt(100));
    executor.execute(new FutureTask<Integer>(workers[i]));
    

    to wrap the Callable manually and enqueue it as Runnable without further wrapping (note the use of execute rather than submit)

Since you want to enable retrieval of the original Callable, the second option is for you, as it gives you full control over the FutureTask instance:

static class MyFutureTask<T> extends FutureTask<T> {
    final Callable<T> theCallable;

    public MyFutureTask(Callable<T> callable) {
        super(callable);
        theCallable=callable;
    }
}

submitting code:

    for (int i=0; i< workers.length; i++){
        workers[i] = new CallableWorkerThread(random.nextInt(100));
        executor.execute(new MyFutureTask<Integer>(workers[i]));
    }

RejectedExecutionHandler:

class RejectionHandlerImpl implements RejectedExecutionHandler{
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if(r instanceof MyFutureTask) {
            MyFutureTask<?> myFutureTask = (MyFutureTask)r;
            Callable<?> c=myFutureTask.theCallable;
            System.out.println(c);
        }
        else System.out.println(r);
    }
}