How to run Parallel tasks inside parallel tasks

262 views Asked by At

Can we write Threadpool executor service inside an executor service ? Can anyone suggest how to run parallel tasks inside parallel tasks ?

Suppose there are 10 tasks which need to run in parallel and inside each task I have to run 100 parallel tasks. Any suggestions please

ExecutorService executor1 = Executors.newFixedThreadPool(8);
for (int i = 0; i < 8; i++) {
    ExecutorService executor2 = Executors.newFixedThreadPool(115);
    for (int j = 0; j < 115; j++) {
        Runnable worker = new UpdatecheckerTest(Region.getRegion(Regions.US_EAST_1),"");
        executor2.execute(worker);
      }
  }
executor1.shutdown();

Is this the correct approach ?

1

There are 1 answers

0
amos guata On

This approach will work, but I think the right solution depends on a few other things that you are not mentioning.

Simple Case

if the problem you are trying to solve is very simple, short, not a very big part of your overall system and performance or stability is not much of a concern. i wouldn't even bother with using a Thread pool at all and just use parallel streams

your code could look something like this:

IntStream.range(0,8).().forEach(i -> {
    IntStream.range(0,115).parallel().forEach(j -> {
        new UpdatecheckerTest(Region.getRegion(Regions.US_EAST_1),"").run();
    });
});

Main part of the overall system

If the problem you are trying to solve is really a major part of your system, when i look at what you are describing i actually see a large task which is representing what is happening inside the outer loop (the i loop) and a small tasks which is representing what is happening inside the inner loop (the j loop). If those tasks take up a main role in your system you might want to put those tasks in their own classes to make them more readable, reusable and easier to change later on. your code could look something like that:

SmallTask.java

import java.text.MessageFormat;

public class SmallTask implements Runnable {
    private String identifier;

    public SmallTask (String identifier) {
        this.identifier = identifier;
    }

    @Override
    public void run() {
        System.out.println(String.format(MessageFormat.format("Executing SmallTask with id: {0}", identifier)));
        // what ever happens in new UpdatecheckerTest(Region.getRegion(Regions.US_EAST_1),"").run()
    }
}

LargeTask.java

import java.text.MessageFormat;
import java.util.stream.IntStream;

public class LargeTask implements Runnable {
    private String identifier;

    public LargeTask (String identifier) {
        this.identifier = identifier;
    }

    @Override
    public void run() {
        System.out.println(String.format(MessageFormat.format("Executing LargeTask with id: {0}", identifier)));
        IntStream.range(0, 115).parallel().forEach(j -> {
            new SmallTask(identifier + "-" + String.valueOf(j)).run();
        });
    }
}

Main.java

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

public class Main {

    public static void main(String[] args) {
        IntStream.range(0,8).parallel().forEach(i -> {
            new LargeTask(String.valueOf(i)).run();
        });
    }
}

i would even go a step further and say that the large task or what initiates it could be an event in an event driven architecture you could organize your system to have different kinds of events all of which could be executed asynchronously.

Performance and stability matters

If this code runs very frequently in you system then i would consider using a thread poll which allows you to control how many threads are being used and if the threads allocated to run LargeTask are the same as those allocated to run SmallTask or not.

then your code could look something like this:

SmallTask.java

import java.text.MessageFormat;

public class SmallTask implements Runnable {
    private String identifier;

    public SmallTask (String identifier) {
        this.identifier = identifier;
    }

    @Override
    public void run() {
        System.out.println(String.format(MessageFormat.format("Executing SmallTask with id: {0}", identifier)));
        // what ever happens in new UpdatecheckerTest(Region.getRegion(Regions.US_EAST_1),"").run()
    }
}

LargeTask.java

import java.text.MessageFormat;
import java.util.stream.IntStream;

public class LargeTask implements Runnable {
    private String identifier;

    public LargeTask (String identifier) {
        this.identifier = identifier;
    }

    @Override
    public void run() {
        System.out.println(String.format(MessageFormat.format("Executing LargeTask with id: {0}", identifier)));
        IntStream.range(0, 115).forEach(j -> {
            TasksExecutor.getSmallTaskExecutor().execute(new SmallTask(identifier + "-" + String.valueOf(j)));
        });
    }
}

TasksExecutor.java

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TasksExecutor {
    private static ExecutorService largeTasksExecutor = Executors.newFixedThreadPool(8);
    private static ExecutorService smallTaskExecutor = Executors.newFixedThreadPool(115);

    public static ExecutorService getLargeTaskExecutor () {
        return largeTasksExecutor;
    }

    public static ExecutorService getSmallTaskExecutor () {
        return smallTaskExecutor;
    }
}

Main.java

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

public class Main {

    public static void main(String[] args) {
        IntStream.range(0,8).forEach(i -> {
            TasksExecutor.getLargeTaskExecutor().execute(new LargeTask(String.valueOf(i)));
        });
    }
}

Don't forget to add functionality to close the thread pool if needed. and maybe add some sort of dependency injection between each task and the specific thread pool that you want to manage it it will give you better flexibility later on

If you want to take it a step further you could instead use a Messaging Framework where you could use different queues to manage all the tasks that need to take place. like ZeroMQ and Kafka