I have a requirement where I have to write a sqs consumer which consumes messages asynchronously from AWS SQS. My assumption is that JMS is multi threaded and for each invocation of MessageListener's onMessage(), it will assign a new thread to it.
SQSConnectionManager.java:
public class SQSConnectionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(SQSConnectionManager.class);
private SQSConnectionFactory sqsConnectionFactory;
private SQSConnection sqsConnection;
private Session sqsSession;
public SQSConnectionManager() {
}
public void createSQSConnection(final String queueName) throws JMSException {
LOGGER.info("Initializing sqs connection");
sqsConnectionFactory = new SQSConnectionFactory(
new ProviderConfiguration(),
AmazonSQSClientBuilder.standard()
.build()
);
sqsConnection = sqsConnectionFactory.createConnection();
sqsSession = sqsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = sqsSession.createQueue(queueName);
MessageConsumer sqsConsumer = sqsSession.createConsumer(queue);
sqsConsumer.setMessageListener(new MyCustomListener());
sqsConnection.start();
LOGGER.info("SQS Connection started");
}
}
MyCustomListener.java:
public class MyCustomListener implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomListener.class);
public MyCustomListener() {}
@Override
public void onMessage(Message message) {
try {
LOGGER.info("onMessage() Thread name : {}", Thread.currentThread().getName());
LOGGER.info("onMessage() Thread id : {}", Thread.currentThread().getId());
LOGGER.info("Reading incoming sqs message");
final SQSTextMessage sqsTextMessage = (SQSTextMessage) message;
final String receivedMessage = sqsTextMessage.getText();
LOGGER.info("Received sqs message : {}", receivedMessage);
helper(receivedMessage);
} catch (JMSException e) {
LOGGER.error("Failed to read incoming sqs message : {}", e.getCause());
}
}
private void helper(final String sqsMessage) {
LOGGER.info("helper() Thread name : {}", Thread.currentThread().getName());
LOGGER.info("helper() Thread id : {}", Thread.currentThread().getId());
LOGGER.info("sqs message : {}", sqsMessage);
}
}
Application.java
public class Application {
private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) throws Exception {
SQSConnectionManager sqsConnectionManager = new SQSConnectionManager();
sqsConnectionManager.createSQSConnection("test-queue");
}
}
AWS Maven Dependency:
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-messaging-lib</artifactId>
<version>1.1.0</version>
</dependency>
This Java app is deployed to AWS as elastic bean stalk application.
When I check the logs in Cloud Watch I see same thread id for both onMessage() and helper():
Can anyone please help me understand how does JMS Listener handle threading concept? Does it ensure execution is multi-threaded?

You're assumption about multi-threaded support in JMS is incorrect. In particular, the
SessionandMessageConsumerobjects are not thread-safe. Asynchronous message consumption by aMessageListeneris serial (i.e. not parallel/concurrent) so you're seeing the expected behavior.This is described in more detail in section 2.14 of the Jakarta Messaging 3.1 specification. This explanation is provided for imposing concurrency limits on the
Session:Therefore, if you want concurrent consumption in your case then you need to create additional sessions and consumers.
That said, you can certainly manage the concurrency yourself if you like (e.g. using the classes in
java.util.concurrent). That is fine for simple use-cases involvingAUTO_ACKNOWLEDGE. However, if you ever move beyond such a simple use-case to something more complex involvingCLIENT_ACKNOWLEDGEor transactions then your code will get more complicated because you'll have to manage thread-safe access to theSessionobject to deal with acknowledgements, commits, roll-backs, etc. Creating multiple sessions and consumers is often simpler to implement and more straight-forward to maintain.