Creating multiple level threads in a hierarchical manner

700 views Asked by At

I am trying to create an application , which will create threads in a tree like manner.My main method is at Level0 , which creates thread in Level1 .Then Level1 will create some threads. Each threads in Level1 , will create different sets of threads as Level2 , and so on.

Below is the code that I am trying to use , using ExecutorService :

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadTree {

public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();

//This is the main , level0

List ll = new ArrayList<>();
ll.add(2);
ll.add(5);
ll.add(8);

Iterator it = ll.iterator();
while (it.hasNext()) {

    exec.submit(new Level1((Integer)it.next()));
}
}
}

class Level1 implements Runnable{

private ScheduledExecutorService exec;
private Integer num;

public Level1(Integer n){
    num = n;
    exec = Executors
            .newScheduledThreadPool(n);
}

@Override
public void run() {

    for(int i=0;i<num;i++){
        exec.scheduleAtFixedRate(new Level2(), 0, 2, TimeUnit.SECONDS);
    }


}

}

class Level2 implements Runnable{

@Override
public void run() {

    System.out.println("Current Thread ID : " + Thread.currentThread().getId() + "Current Thread Name : " 
            + Thread.currentThread().getName()) ;
    //Do some task and create new threads

}

}

I have 2 questions :

  1. Is this the only approach to create the threads in a tree manner? Are there any other ways to handle this effectively, with some grouping? I have shown 3 levels , but there can be more levels also.
  2. In case, this is a good way, what is the best way to propagate any failure in a thread in one of the levels the layer above it and so on.

Thanks in advance.

1

There are 1 answers

0
hemant1900 On BEST ANSWER

I think you can achieve this by following the steps below -

  • Using only one thread pool executor
  • Creating a notification mechanism from child task to parent. (Please note task is different than thread. Task is just a Runnable object while Thread is a Java thread that executes the runnable task through executor.)

Please refer the sample below -

public class ThreadTree {

private static final ExecutorService executor = Executors.newCachedThreadPool();

public static void main(String[] args) {
    List<Integer> level1Nodes = new ArrayList<Integer>();
    level1Nodes.add(2);
    level1Nodes.add(5);
    level1Nodes.add(8);
    // start threads
    for (Integer num : level1Nodes) {
        executor.submit(new Level1(num));
    }
}

private static class Notification {
    private final Object result;
    private final Exception rootException;

    public Notification(Object result, Exception rootException) {
        this.result = result;
        this.rootException = rootException;
    }

    public Object getResult() {
        return result;
    }

    public Exception getRootException() {
        return rootException;
    }
}

private static abstract class NotificationHandler {
    private final AtomicInteger expectedNotifications;
    private final List<Notification> notifications;

    public NotificationHandler(int expectedNotifications) {
        this.expectedNotifications = new AtomicInteger(expectedNotifications);
        this.notifications = new ArrayList<Notification>();
    }

    public void handleNotification(Notification notification) {
        notifications.add(notification);
        if (expectedNotifications.decrementAndGet() == 0) {
            postRun(notifications);
        }
    }

    public void postRun(List<Notification> notifications) {
        for (Notification notification : notifications) {
            System.out.println("Status: " + (notification.getRootException() == null ? "Failed" : "Success") + ", Result: " + (notification.getResult() != null ? notification.getResult() : "No result"));
        }
    }
}

private static class Level1 extends NotificationHandler implements Runnable {
    private final int num;

    public Level1(int num) {
        super(num);
        this.num = num;
    }

    public void run() {
        for (int i = 0; i < num; i++) {
            executor.submit(new Level2(2, this)); // 2 is just an assumed number of nodes at level 2
        }
    }
}

private static class Level2 extends NotificationHandler implements Runnable {
    private final int num;
    private final NotificationHandler parentNotificationHandler;

    public Level2(int num, NotificationHandler parentNotificationHandler) {
        super(num);
        this.num = num;
        this.parentNotificationHandler = parentNotificationHandler;
    }

    public void run() {
        for (int i = 0; i < num; i++) {
            executor.submit(new Level2(2, this)); // 2 is just an assumed number of nodes at level 3
        }
        // execute the task and then notify parent
        parentNotificationHandler.handleNotification(new Notification("done", null));
    }
}

private static class Level3 extends NotificationHandler implements Runnable {
    private final int num;
    private final NotificationHandler parentNotificationHandler;

    public Level3(int num, NotificationHandler parentNotificationHandler) {
        super(num);
        this.num = num;
        this.parentNotificationHandler = parentNotificationHandler;
    }

    public void run() {
        // execute the task and then notify parent
        parentNotificationHandler.handleNotification(new Notification("done", null));
    }
}
}

Here Notification is being passed from Level3 -> Level2' ->Level1'. Every child task has a duty to notify parent once it is done with its own job. Once all child tasks have notified the parent task will do the post run actions and notify its parent.

Here it does not matter which threads are being used from thread pool executor. The only thing that matters is following the rule of notification to parent by every child task and then parent doing post actions and further notifying its parent.

Notification class consists on result and rootException which can be set from child task so that parent can know what went wrong in child task and the exception can travel up to top level.