Losing JMS Messages with Spring JMS and ActiveMQ when application server is suddenly stopped

1.2k views Asked by At

I have a Spring JMS application that has a JMS Listener that connects to an Active MQ queue on application startup. This JMS listener is a part of the an application that takes a message, enriches it with content, and then delivers it to a topic on the same ActiveMQ broker.

The sessionTransacted is set to True. I'm not performing any database transactions, so I do not have @Transactional set anywhere. From what I've read, the sessionTransacted property sets a local transaction around the JMS Listener's receive method and therefore it will not pull the message off the queue until the transaction is complete. I've tested this using a local ActiveMQ instance, and on my local tomcat container, and it worked as expected.

However, when I deploy to our PERF environment and retry the same test, I notice that the message that was currently in-flight when the server was shutdown, is pulled from queue prior to completing the receive method.

What I would like to know is if there is anything obvious that I should be looking for? Are there certain JMS headers that would cause this behaviour to occur? Please let me know if there is anymore information that I can provide.

I'm using Spring 4.1.2.RELEASE with Apache ActiveMQ 5.8.0, on a Tomcat 7 container running Java 8.

UPDATE - Adding my Java JMS Configurations. Please note that I substituted what I had in my PERF properties file into the relevant areas for clarity.

    @Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setMaxMessagesPerTask(-1);
    factory.setConcurrency(1);
    factory.setSessionTransacted(Boolean.TRUE);
    return factory;
}

@Bean
public CachingConnectionFactory connectionFactory(){
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();

    redeliveryPolicy.setInitialRedeliveryDelay(1000);
    redeliveryPolicy.setRedeliveryDelay(1000);
    redeliveryPolicy.setMaximumRedeliveries(6);
    redeliveryPolicy.setUseExponentialBackOff(Boolean.TRUE);
    redeliveryPolicy.setBackOffMultiplier(5);

    ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("queue.username"), environment.getProperty("queue.password"), environment.getProperty("jms.broker.endpoint"));
    activeMQ.setRedeliveryPolicy(redeliveryPolicy);
    activeMQ.setPrefetchPolicy(prefetchPolicy());

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ);
    cachingConnectionFactory.setCacheConsumers(Boolean.FALSE);
    cachingConnectionFactory.setSessionCacheSize(1);
    return cachingConnectionFactory;
}

@Bean
public JmsMessagingTemplate jmsMessagingTemplate(){
    ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("queue.out"));

    JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory());
    jmsMessagingTemplate.setDefaultDestination(activeMQ);

    return jmsMessagingTemplate;
}

protected ActiveMQPrefetchPolicy prefetchPolicy(){
    ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
    int prefetchValue = 0; 
    prefetchPolicy.setQueuePrefetch(prefetchValue);
    return prefetchPolicy;
}

Thanks,

Juan

1

There are 1 answers

0
jcb On BEST ANSWER

It turns out that there were different versions of our application deployed on our PERF environment. Once the application was updated, then it worked as expected.