I am working on a project in which I will have different Bundles/Models. Let's take an example, Suppose I have 4 bundles and each of those bundles will have a method name process
.
Below are the things, I am supposed to do-
- I need to call all those 4 Bundles
process method
in parallel using multithread andprocess method
in each bundle will return me a map and then write this map into the database in that same thread or whatever is the best approach to do (I am not sure on this which is the right way to go). - And also I want to have some sort of timeout feature enabled at the thread level. Meaning if any Bundle is taking lot of time to execute, then that Bundle thread should get timeout and log as an error stating that this particular bundle got timeout bcoz it was taking lot of time.
The following attempt that I have done is most probably flawed and error handling is by no means complete. Can anybody guide me what I am supposed to do in the error handling cases as well?
Below is my method which will call process method
of all the bundles in a multithreaded way.
public void processEvents(final Map<String, Object> eventData) {
ExecutorService pool = Executors.newFixedThreadPool(5);
List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>();
Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER);
for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs);
entries.add(processBundleHolderEntry);
}
try {
List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS);
for (int i = 0; i < futures.size(); i++) {
// This works since the list of future objects are in the
// same sequential order as the list of entries
Future<Object> future = futures.get(i);
ProcessBundleHolderEntry entry = entries.get(i);
if (!future.isDone()) {
// log error for this entry
}
}
} catch (InterruptedException e) {
// handle this exception!
}
}
Secondly, an implementation of Callable for your threads:
public class ProcessBundleHolderEntry implements Callable {
private BundleRegistration.BundlesHolderEntry entry;
private Map<String, String> outputs;
public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) {
this.entry = entry;
this.outputs = outputs;
}
public Object call() throws Exception {
final Map<String, String> response = entry.getPlugin().process(outputs);
// write to the database.
System.out.println(response);
return response;
}
}
Can anyone tell me whether there is any problem with the above approach or is there any better and efficient way of doing the same thing? I am not sure whether there is any thread safety issue as well.
Any help will be appreciated on this.
The only shared object in your code is
eventData
: as long as it is not modified while this method is running (or if the map and its content is thread safe and changes are safely published) you should be fine.Regarding exception handling of your tasks, you typically do:
Regarding the interrupted exception: it means the thread in which you are executing the method has been interrupted. What you need to do depends on your use case, but you should generally stop what you are doing, so something like:
Note: the purpose of thread pools is to be reused - you should declare the executor service as an (private final) instance variable rather than creating one every time the
processEvents
method is called.