Configuring Maximum Concurrent Message Processing in Amazon MQ w/ActiveMQ with JMS

73 views Asked by At

I'm trying to understand how to integrate Amazon MQ using the ActiveMQ Engine and the JMS protocol into my application. The main goal is to slow down certain processes and execute them sequentially in a queue.

I've been exploring JMS integration because it simplifies the implementation of a listener to asynchronously receive messages from the queue whenever a sender pushes them into the queue. However, it's unclear to me whether the messages are consumed one-by-one. I require that, for each consumer, only one message is processed at a time. This way, I can track the maximum number of messages being processed across the entire nodes cluster at a specific moment, ensuring only one message is handled per node.

Or, at the very least, I need a way to set a maximum number of messages processed by a consumer simultaneously.

Here's an example of my code for a consumer deployed on each node:

@Singleton
public class MmfgActiveMqConsumer {

    Logger LOG = LogManager.getLogger(MmfgActiveMqConsumer.class);

    // Specify the connection parameters.
    private final static String WIRE_LEVEL_ENDPOINT
            = "ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61617";
    private final static String ACTIVE_MQ_USERNAME = "MyUsername123";
    private final static String ACTIVE_MQ_PASSWORD = "MyPassword456";

    @PostConstruct
    public void init() {
        // Start to listen to ActiveMq Queue
        listenToQueue();
    }

    public void listenToQueue() {
        // Configure connection and session

        new Thread(() -> {
            try {
                final ActiveMQConnectionFactory connectionFactory = createActiveMQConnectionFactory();
                receiveMessage(connectionFactory);
            } catch (Exception e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }

    private static void receiveMessage(ActiveMQConnectionFactory connectionFactory) throws JMSException {
        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        final Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();

        // Create a session.
        final Session consumerSession = consumerConnection
                .createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create a queue named "MyQueue".
        final Destination consumerDestination = consumerSession
                .createQueue("MyQueue");

        // Create a message consumer from the session to the queue.
        final MessageConsumer consumer = consumerSession
                .createConsumer(consumerDestination);

        consumer.setMessageListener(new ActiveMqMessageListener());

        // Thread sleep logic
        //TODO
        // Clean up the consumer.
        consumer.close();
        consumerSession.close();

    }

    private static ActiveMQConnectionFactory createActiveMQConnectionFactory() {
        // Create a connection factory.
        final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT);

        // Pass the sign-in credentials.
        connectionFactory.setUserName(ACTIVE_MQ_USERNAME);
        connectionFactory.setPassword(ACTIVE_MQ_PASSWORD);
        return connectionFactory;
    }
}

This is the Listener that is set inside the consumer:

public class ActiveMqMessageListener implements MessageListener {

    private Logger LOG = LogManager.getLogger(ActiveMqMessageListener.class);

    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage textMessage) {
            try {
                String payload = textMessage.getText();
                LOG.info("Message receive from ActiveMq Queue: " + payload);
            } catch (JMSException e) {
                LOG.error("Error receiving message from ActiveMq Queue: " + e);
            }
        }
    }
}

Is it correct that each time a message is pushed to the queue, the onMessage() method is invoked once, and the next message will wait in the queue until the method finishes its execution?

How would you recommend achieving this or is there a specific configuration in Amazon MQ or ActiveMQ that allows setting a maximum limit on the number of messages processed concurrently by a JMS consumer? Any insights or code examples would be greatly appreciated.

1

There are 1 answers

0
Matt Pavlovich On

Yes, the onMessage() method is called one-at-a-time.

If you want a strict 1 message per-consumer design, you'll want to use a couple of features:

  1. ActiveMQ.INDIVIDUAL_ACKNOWLEDGE acknowledgement mode. When you processing is complete in the onMessage(), you then call message.acknowledge()

     public void onMessage(Message message) {
        ... do things ..
        message.acknowledge()
     }
    
  2. Set prefetch to 0 or 1

Common use cases for strict 1 message per-consumer would be long running tasks, or industrial applications. For general enterprise data processing, this is not a practical design pattern.

!!! Keep in mind, this design drastically drops consuming performance due to the time waiting for the back-and-forth communication. The big advantage of ActiveMQ is its ability to use prefetch, which overcomes this latency.

Another way to think about the design is that each consumer may be responsible for a batch of messages. For example, 1,000. or 100. Whatever that batch size works for your app-- use that as the prefetch to keep messages moving.