Thread safety issue while timing out the thread for each bundle

131 views Asked by At

I am working on a project in which I will have different Bundles/Models. Let's take an example, Suppose I have 4 bundles and each of those bundles will have a method name process.

Below are the things, I am supposed to do-

  1. I need to call all those 4 Bundles process method in parallel using multithread and process method in each bundle will return me a map and then write this map into the database in that same thread or whatever is the best approach to do (I am not sure on this which is the right way to go).
  2. And also I want to have some sort of timeout feature enabled at the thread level. Meaning if any Bundle is taking lot of time to execute, then that Bundle thread should get timeout and log as an error stating that this particular bundle got timeout bcoz it was taking lot of time.

The following attempt that I have done is most probably flawed and error handling is by no means complete. Can anybody guide me what I am supposed to do in the error handling cases as well?

Below is my method which will call process method of all the bundles in a multithreaded way.

public void processEvents(final Map<String, Object> eventData) {
    ExecutorService pool = Executors.newFixedThreadPool(5);
    List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>();

    Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER);

    for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
        ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs);
        entries.add(processBundleHolderEntry);
    }

    try {
        List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS);
        for (int i = 0; i < futures.size(); i++) {
            // This works since the list of future objects are in the
            // same sequential order as the list of entries
            Future<Object> future = futures.get(i);
            ProcessBundleHolderEntry entry = entries.get(i);
            if (!future.isDone()) {
                // log error for this entry
            }
        }
    } catch (InterruptedException e) {
        // handle this exception!
    }
}

Secondly, an implementation of Callable for your threads:

public class ProcessBundleHolderEntry implements Callable {
    private BundleRegistration.BundlesHolderEntry entry;
    private Map<String, String> outputs;

    public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) {
        this.entry = entry;
        this.outputs = outputs;
    }

    public Object call() throws Exception {
        final Map<String, String> response = entry.getPlugin().process(outputs);
        // write to the database.
        System.out.println(response);
        return response;
    }
}

Can anyone tell me whether there is any problem with the above approach or is there any better and efficient way of doing the same thing? I am not sure whether there is any thread safety issue as well.

Any help will be appreciated on this.

1

There are 1 answers

3
assylias On BEST ANSWER

The only shared object in your code is eventData: as long as it is not modified while this method is running (or if the map and its content is thread safe and changes are safely published) you should be fine.

Regarding exception handling of your tasks, you typically do:

try {
    future.get();
} catch (ExecutionException e) {
    Throwable exceptionInFuture = e.getCause();
    //throw, log or whatever is appropriate
}

Regarding the interrupted exception: it means the thread in which you are executing the method has been interrupted. What you need to do depends on your use case, but you should generally stop what you are doing, so something like:

} catch (InterruptedException e) {
    pool.shutdownNow(); //cancels the tasks
    //restore interrupted flag and exit
    Thread.currentThread.interrupt();
    //or rethrow the exception
    throw e;
}

Note: the purpose of thread pools is to be reused - you should declare the executor service as an (private final) instance variable rather than creating one every time the processEvents method is called.