I am trying to add transactions over an AMQP connection which I created for connecting to an external AMQP broker (which I do not have access to) from my project.
In order to connect to the AMQP broker with SSL I am using a ConnectionFactory implementation from Qpid: org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl
.
I am connecting with something like below:
URI: amqps://x.x.x.x:port?brokerlist='ssl://x.x.x.x:port'
topic: topic://topicName
a truststore and a keystore
The connection is initialized from an Apache Camel (v2.14.1) route, with the Camel AMQP component which was built on top of JMS, exactly as stated in the AMQP Component documentation from Apache Camel (which can be found here: http://camel.apache.org/amqp.html in section "Using Topics").
The AMQP component works perfectly fine when I use it without transactions enabled.
If, on the other hand, I enable transactions by:
providing a transaction manager which has to be an implementation of the interface
org.springframework.transaction.PlatformTransactionManager
asorg.springframework.transaction.jta.JtaTransactionManager
setting transacted=true configuration option on the Camel endpoint
setting cacheLevelName=CACHE_NONE on the Camel endpoint somehow only one messaged gets picked up from the topic, everything else is lost/ignored.
The logs show that the transaction manager is created successfully:
DEBUG org.apache.camel.util.IntrospectionSupport: Configured property: transactionManager on bean: org.apache.camel.component.jms.JmsConfiguration@5b05bf with value: org.springframework.transaction.jta.JtaTransactionManager@1d642e2
DEBUG org.apache.camel.util.IntrospectionSupport: Configured property: cacheLevelName on bean: org.apache.camel.component.jms.JmsConfiguration@5b05bf with value: CACHE_NONE
DEBUG org.apache.camel.util.IntrospectionSupport: Configured property: concurrentConsumers on bean: org.apache.camel.component.jms.JmsConfiguration@5b05bf with value: 1
DEBUG org.apache.camel.util.IntrospectionSupport: Configured property: transacted on bean: org.apache.camel.component.jms.JmsConfiguration@5b05bf with value: true
Would you please let me know what am I doing wrong?
(I suspect that maybe the broker I am connecting to does not support transactions)
I have managed to fix this issue ( sorry for the very late post here with the answer ). I have found a lot of usefull knowledge here: http://tmielke.blogspot.ro/2012/03/camel-jms-with-transactions-lessons.html
It does not matter which transaction manager you use (JmsTransactionManager, Atomikos etc.), all I had to do is to set the option cacheLevelName=CACHE_CONSUMER on the endpoint.
If you need additional information, you will find everything in the link I posted here.