I am new to core Java programming and I am currently working on a project related to multi-threading.
We have an asynchronous service that multiple clients call. We have a separate consumer for each client that consumes and processes requests from queue using fixed number of threads assigned for the client. Now the problem is, some clients send heavy traffic over the week while some send over the weekends. e.g. client 1 will send 900 requests per second on weekdays while only 200 over weekends. Client 2 on the other hand will send 100 over the weekdays but more than 1000 on the weekends.
Because of this, we need to update thread allocation for each client in real time without restarting the service. Based on my understanding, setCorePoolSize automatically overrides pool size and stops idle threads. But in our setup, we make sure that next task is always submitted to the thread before it starts executing current. So threads will never become idle and reducing pool size could be a problem.
I saw a couple of posts which talked about interrupting threads before or after execution and I tried to write a code snippet using CountDownLatch:
DynamicThreadPool class:
public class DynamicThreadPool extends ThreadPoolExecutor {
private int currentThreadCount;
private AtomicReference<CountDownLatch> executeLatch;
public DynamicThreadPool(int nThreads) {
super(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
currentThreadCount = nThreads;
executeLatch = new AtomicReference<>(new CountDownLatch(0));
}
@Override
public void beforeExecute(Thread t, Runnable r)
{
super.beforeExecute(t, r);
try {
executeLatch.get().await();
} catch (final InterruptedException e) {
t.interrupt();
}
}
public void resize(int nThreads) {
if (currentThreadCount == nThreads) {
return;
}
try {
executeLatch.getAndSet(new CountDownLatch(1));
currentThreadCount = nThreads;
setCorePoolSize(nThreads);
setMaximumPoolSize(nThreads);
prestartAllCoreThreads();
executeLatch.get().countDown();
}
catch (Exception e) {
log.warn("resizer caught exception");
}
}
}
Consumer class:
public class Consumer {
int numberOfProcessingThreads;
public Consumer(int nThreads) {
numberOfProcessingThreads = n Threads;
DynamicThreadPool threadPool = new DynamicThreadPool(nThreads);
}
public void start() {
for (int i = 0; i < messageProcessingThreads; i++)
{
threadPool.submit(new MessageConsumer.ProcessRequest());
}
}
public void updateNumberOfThreads(int newThreadCount) {
threadPool.resize(newThreadCount);
}
class ProcessRequest implements Runnable {
// 1. Logic to submit a new ProcessRequestTask if consumer is consuming messages
threadPool.submit(new MessageConsumer.ProcessRequest());
// 2. Logic to process request
}
}
To summarize, when resize function is called, CountDownLatch is set to 1 so that all threads are blocked on executeLatch.get().await(). Then change the pool size, start all core threads and unblock waiting threads with countDown function. Assuming updateNumberOfThreads is being called whenever thread numbers need to be updated, does this approach make sense?
My concern is if some of the threads are not blocked on executeLatch.get().await() because they are executing, these threads will never become idle and never stop running. (e.g. if we are reducing pool size from 20 to 10 but only 5 threads are blocked in beforeExecute method, then remaining 15 will never become idle and will continue running)
Is there a better way to achieve this? If I do not submit new task to the thread before execution, how can I make sure that threads are not sitting idle and are always processing requests? Any help is appreciated.