Waiting for a hierarchy of tasks to complete

177 views Asked by At

This is an abstraction of my actual problem, but I hope it's accurate enough to explain things.

I'm processing a file hierarchy, and I'm processing the files asynchronously using a Java ThreadPoolExecutor with a finite number of threads and an unbounded queue. Starting at a particular directory, I add a task to the queue for each file within the directory. Then I shutdown and await completion.

The tasks are despatched using Executor.execute(Runnable).

The problem is that when the directory contains further directories, one of these tasks may spawn further tasks, and these tasks are not being executed because at the top level the ThreadPoolExecutor has already been shut down.

So my question is, in my top level thread, how do I await completion of the whole hierarchy of tasks, recognising that they haven't actually all started yet?

I did say that this is an abstraction of the problem. If it were exactly as described, I could walk the whole hierarchy in the orginal parent thread and fire off all tasks from there. But in my real problem I can't do that: it's an essential feature of the problem that a spawned child task itself submits further tasks.

2

There are 2 answers

3
Igor Tseluyko On

In your top level thread you can use something like that:

    CountDownLatch latch = new CountDownLatch(N);

    Executor.execute(new Worker(latch, someTask));
    .
    .
    .
    N

    latch.await();

A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times. http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html

2
Zephyr Guo On

Maybe actual problem is hard to describe.According to your abstract problem,the following code can provide a little idea to you.It also can deal with issue of deadlock.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class Test {

    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(3);

        RecursionTask rootTask = new RecursionTask(pool);

        pool.execute(rootTask);

        while(!rootTask.isCompleted()){
            Thread.sleep(1000);
        }

        System.out.println("root task has completed, result="+rootTask.getResult());

    }

}


class RecursionTask implements Runnable{
    private int res;
    private RecursionTask parent;
    private ExecutorService pool;
    private Boolean isCompleted = false;

    /*
     * For constructing root task
     */
    public RecursionTask(ExecutorService pool){
        this.pool = pool;
    }

    /*
     * For constructing child task
     */
    private RecursionTask(RecursionTask parent,ExecutorService pool){
        this.parent = parent;
        this.pool = pool;
    }

    @Override
    public void run() {

        //do something
        if(parent != null)
            res = parent.res + 1;
        else
            res = 0;

        if(res<3){ //child task
            System.out.println("spawn a child task");
            pool.execute(new RecursionTask(this,pool));
        }else{ //last child
            finish(res);
        }

    }

    private void finish(int res){
        isCompleted = true;
        onCompleted(res);
        if(parent != null){
            parent.finish(res);
        }
    }

    public int getResult() throws Exception{
        if(isCompleted()) return res;
        throw new Exception("The task has not been completed.");
    }

    public boolean isCompleted(){
        return isCompleted;
    }

    /**
     * It will be invoked when all child task is completed.
     * @param res The result of child task.
     */
    private void onCompleted(int res){
        System.out.println("Task has been completed, result="+res);
        this.res = res;
    }

}

It can be extended to support multi-child task(a task spawn multi task).Modify type of isCompleted to int.Look like complete_count.