MPSC queues: Handling List of CompletableFutures cleanly

503 views Asked by At

Trying to implement list of tasks queueing to a blocking queue, asynchronously. A BlockingQueue that gets consumed batch wise by a single thread and reports completion of each task.

For eg. 1) a web request generates a List<Task> that are queued to a BlockingQueue which is consumed by a single threaded consumer in batches. 2) Block the web request till the tasks for that request are completed. 3) Notify the webrequest when ALL of the tasks for that request are completed by the Single threaded consumer.

I thought of having a Map.Entry<CompletableFuture<Task>,Task> that gets pushed to the queue and notify a complete() on the CompletableFuture corresponding to the Task that is processed. The webrequest basically waits for all the futures to complete.

class BlockingQueue<Pair> {
run(){
    List<Pair> batchTasks = takeBatch(n);
    compute(batchTasks)
}

compute(List tasks){
    ... batch operation
    tasks.forEach -> mark complete on the completableFuture
}
}
class WebRequest {
get(){
....

    List<Pair<CompletableFuture<Task>, Task>> taskMap = new ArrayList<>(taskList.size());

    taskList.stream().forEach(val -> taskMap.add(new Pair<>(new CompletableFuture<Task>(),val)));

    taskMap.forEach((pair) -> {
       pair.left.join();
    });

}}

Is there a better (CLEANER) way to implement this kind of use case ?

0

There are 0 answers