I have two process as shown below. Each of my process has run
and shutdown
method
Process processA = new ProcessA("processA", getProcessAProperties());
Process processB = new ProcessB("processB", getProcessBProperties());
- I want to have different thread pool configuration for each Process so that ProcessA run in its own thread pool and ProcessB run in its own thread pool independent of each other.
- And I cannot share Process object between each threads of its own thread pool.
Below is how my Process class looks like and my ProcessA
, ProcessB
class simply extends Process class. And I do all important stuff in my run method.
public abstract class Process implements Runnable {
private Properties props;
private String processName;
public Process(String processName, Properties props) {
this.processName = processName;
this.props = props;
}
protected abstract void shutdown();
protected abstract void run(String processName, Properties props);
@Override
public final void run() {
run(processName, props);
}
public Properties getProps() {
return props;
}
public void setProps(Properties props) {
this.props = props;
}
public String getProcessName() {
return processName;
}
public void setProcessName(String processName) {
this.processName = processName;
}
}
Below is a simple example of how I am running my ProcessA
with its own thread pool. There are three threads and each thread gets its own ProcessA object to work on. Now I want to extend this in a more generic way so that it can work for both my process ProcessA
and ProcessB
.
public static void main(String[] args) {
int numberOfThreads = 3;
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
final List<Process> processes = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
// each thread works on different Process object
Process processA = new ProcessA("processA", getProcessAProperties());
processes.add(processA);
executor.submit(processA);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (Process process : processes) {
process.shutdown();
}
executor.shutdown();
try {
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace;
}
}
});
}
So to solve this problem in a more generic way, I created a Process handler as shown below:
public final class ProcessHandler {
private final ExecutorService executorServiceProcess;
private final List<Process> processes = new ArrayList<>();
private final Thread shutdownHook = new Thread() {
@Override
public void run() {
for (Process process : processes)
process.shutdown();
executorServiceProcess.shutdown();
}
};
public ProcessHandler(Process process, int poolSize) {
this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
Runtime.getRuntime().addShutdownHook(shutdownHook);
for (int i = 0; i < poolSize; i++) {
try {
// this line throws exception
Process p = process.getClass().newInstance();
p.setProcessName(process.getProcessName());
p.setProps(process.getProps());
processes.add(p);
executorServiceProcess.submit(p);
} catch (InstantiationException | IllegalAccessException e) {
e.printStackTrace();
}
}
}
public void shutdown() {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
shutdownHook.start();
try {
shutdownHook.join();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
And this is the way my main method looks now:
public static void main(String[] args) {
Process processA = new ProcessA("processA", getProcessAProperties());
Process processB = new ProcessB("processB", getProcessBProperties());
// processA will run with three threads in its own thread pool
ProcessHandler processHandlerA = new ProcessHandler (processA, 3);
// processB will run with two threads in its own thread pool
ProcessHandler processHandlerB = new ProcessHandler (processB, 2);
// now I can call shutdown on them
processHandlerA.shutdown();
processHandlerB.shutdown();
}
This line in my ProcessHandler
class Process p = process.getClass().newInstance();
throws exception as:
java.lang.InstantiationException: com.david.test.ProcessA
I am not sure why InstantiationException
is getting thrown?
Just a note: Each of these processes are kafka consumer and generally kafka consumer are not thread safe so that's why I have to create a new object every time and submit to executor.
Update:
This is my ProcessA class looks like:
public class ProcessA extends Process {
private KafkaConsumer<byte[], byte[]> consumer;
public ProcessA(String processName, Properties props) {
super(processName, props);
}
@Override
public void shutdown() {
consumer.wakeup();
}
@Override
protected void run(String processName, Properties props) {
consumer = new KafkaConsumer<>(props);
System.out.println("Hello World");
// do all kind of important stuff here
}
}
Do you a concrete class that extents your Process abstract class?
abstract classes can not be instantiated on their own, see: http://docs.oracle.com/javase/tutorial/java/IandI/abstract.html
Perhaps try extending your abstract class with a concrete class and creating instances of your concrete class, you can still cast them as a Process object if needed.