get callable from ThreadPoolTaskExecutor or cast Runnable to Callable

2.2k views Asked by At

I'm using ThreadPoolTaskExecutor for executing my tasks which are implemantations of Callable interface. I just want to check in time if task is still in pool (monitoring). How to do that? I know that I can get queue from ThreadPoolExecutor but how can I cast Runnable to Callable?

Basically I have this callable

public interface IFormatter extends Callable<Integer>{
    Long getOrderId();
}

I'm executing it like this

ThreadPoolExecutor.submit(new Formatter(order));

And finally I'd like to loop through queue of ExecutorService in some async method and check if thread with orderId is still there.

2

There are 2 answers

0
Holger On BEST ANSWER

As explained in this answer, you may get control over the FutureTask wrapping the Callable by creating it manually and enqueuing via execute. Otherwise, submit will wrap your Callable into an ExecutorService-specific object and put it into the queue, making it impossible to query properties of the Callable via standard APIs.

Using the custom FutureTask

class MyFutureTask extends FutureTask<Integer> {
    final IFormatter theCallable;

    public MyFutureTask(IFormatter callable) {
        super(callable);
        theCallable=callable;
    }
    Long getOrderId() {
        return theCallable.getOrderId();
    }
}

enqueuing it via threadPoolExecutor.execute(new MyFutureTask(new Formatter(order)));,

you can query order IDs on the queue:

public static boolean isEnqueued(ThreadPoolExecutor e, Long id) {
    for(Object o: e.getQueue().toArray()) {
        if(o instanceof MyFutureTask && Objects.equals(((MyFutureTask)o).getOrderId(), id))
            return true;
    }
    return false;
}

This works for any ExecutorService (assuming it has a queue). If you are using a ThreadPoolExecutor only, you may customize its creation of FutureTask instance (starting with Java 6), instead of relying on the submitter doing it:

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, threadFactory);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue,
        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, handler);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, threadFactory, handler);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if(callable instanceof IFormatter)
            return (FutureTask<T>)new MyFutureTask((IFormatter)callable);
        return super.newTaskFor(callable);
    }
}

Then, using an instance of MyThreadPoolExecutor instead of ThreadPoolExecutor every submission of an IFormatter instance will automatically wrapped using MyFutureTask instead of the standard FutureTask. The drawback is that this works only with this specific ExecutorService and the generic method generates an unchecked warning for the special treatment.

1
David Ehrmann On

Since it looks like you want to monitor the ExecutorService, look into overriding decorateTask(). You can then decorate the future to monitor its state.