De-queuing a message from JMS Queue only when a thread in a threadpool is available to process it

529 views Asked by At

The requirement is to listen on a JMS Queue and process the requests in a asynchronous manner and send a response on a different JMS Queue. The system is only allowed to process a fixed number of requests at a single moment. A request must not be dequeued unless a thread is available to process it.

  1. Dequeue a request from the request queue using a @JMSListener annotated method.
  2. Increment a global counter
  3. The Listener executes the requests by calling an @Async annotated method.
  4. If the counter == max size of the threadpool, then stop listening. Otherwise the listener becomes free to listen to a new request
  5. The Async method processes the request and calls the sendResponse() method in the Producer service to send the response on the response queue
  6. The producer method sends the response on the response queue and decrements the global counter.
  7. If the listener bean was set to stop listening, the producer method will set the bean to start listening again.

The issue: Since the thread that is calling sendResponse() is one of the threadpool threads, this thread will not be free until the sendResponse() method finishes. This means that even though the counter has been decremented, a thread is still isn't available.

The Listener:

@Service
public class ActiveMQListener{

    @Autowired
    ApplicationContext applicationContext;

    @Autowired
    AsyncService aSyncService;

    @Autowired
    ThreadPoolCounter threadPoolCounter;

    @JmsListener(id = "req1", destination = "RequestQueue1", containerFactory = "jmsFactory")
    public void receiveRequest(ActiveMQTextMessage message) throws JMSException {
        String requestMsg = messget.getText()
        
        //some validation

        aSyncService.executeRequest(requestMsg);

        if(threadPoolCounter.incrementCounter() == 10){
            JmsListenerEndpointRegistry customRegistry = applicationContext.getBean(JmsListenerEndpointRegistry.class);
            MessageListenerContainer listenerContainer = customRegistry.getListenerContainer("req1");
            listenerContainer.stop();
        }
    }
}

The producer:

@Service
public class ActiveMQProducer {

    @Autowired
    JmsTemplate jmsTemplate;

    @Autowired
    ThreadPoolCounter threadPoolCounter;

    @Autowired
    ApplicationContext applicationContext;


    public void sendResponse(ActiveMQTextMessage responseMsg) throws JMSException {
        jmsTemplate.convertAndSend("res1" , responseMsg);

        threadPoolCounter.decrement();

        JmsListenerEndpointRegistry customRegistry = applicationContext.getBean(JmsListenerEndpointRegistry.class);
        MessageListenerContainer listenerContainer = customRegistry.getListenerContainer("req1");

        if(!listenerContainer.isRunning()){
            listenerContainer.start();
        }
    }
}

The Async Method:

@Service
public class AsyncService{

    @Autowired
    ActiveMQProducer activeMQProducer;

    @Async("RequestExecutor")
    public void executeRequest(String requestMsg) {
        String resposeMsg = "";

        //Some Async Processing

        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setText(responseMsg);
        activeMQProducer.sendResponse(message);
    }

}

What I have thought of doing:
Increase the concurrency of the listener to be = to the threadpool size. Change the return type of the Async method to return Future. This should solve the issue of ensuring a request is only dequeued when a thread is available to process it. However, in this case, the number of threads has doubled. For example, if the thread pool size is 10, then I will have 10 listener threads waiting for the future.get(). If in the future, the number of requests to be processed must be increased, it be affected by the number of listener threads.

@Service
public class ActiveMQListener{

    @Autowired
    AsyncService aSyncService;

    @Autowired
    ActiveMQProducer activeMQProducer;


    @JmsListener(id = "req1", destination = "RequestQueue1", containerFactory = "jmsFactory")
    public void receiveRequest(ActiveMQTextMessage message) throws JMSException {
        String requestMsg = messget.getText()
        
        //some validation

        Future<String> future = aSyncService.executeRequest(requestMsg);
        String responseMsg = future.get();

        activeMQProducer.sendResponse(responseMsg);
    }
}
0

There are 0 answers