I am trying to integrate azure-service-bus into my java spring-project.
My azure-service-bus subscription has sessions
enabled.
I am using org.springframework.jms.annotation.JmsListener
to setup a listener.
PROBLEM :
- NOT able to create a listener that listens to a session-enabled subscription.
- Not able to push messages to the topic also, they are going to dead-letter-queue. (with reason : DeadLetterReason -> Session id is null., DeadLetterErrorDescription -> Session enabled entity doesn't allow a message whose session identifier is null.)
QUESTION :
- How to enable session enabled message pushing and pulling in spring, for azure-service bus
sample code :
private final JmsTemplate jmsTemplate;
public void sendMessage() {
jmsTemplate.convertAndSend(TOPIC_NAME, "sample_data");
}
@JmsListener(destination = TOPIC_NAME, subscription = SUBSCRIPTION_NAME, containerFactory = "topicJmsListenerContainerFactory")
public void receiveMessage(Object data) throws JMSException {
//logic to process message
}
FYI : The code is working as expected for non-session enabled topics and subcriptions.
Error I keep on getting :
javax.jms.JMSException: It is not possible for an entity that requires sessions to create a non-sessionful message receiver. TrackingId:some_uuid, SystemTracker:some_uuid:subscription_name, Timestamp:2023-12-11T20:04:41 TrackingId:some_uuid, SystemTracker:some_uuid, Timestamp:2023-12-11T20:04:41 [condition = amqp:not-allowed]
at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:698) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.JmsMessageConsumer.<init>(JmsMessageConsumer.java:125) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.JmsTopicSubscriber.<init>(JmsTopicSubscriber.java:36) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.JmsDurableTopicSubscriber.<init>(JmsDurableTopicSubscriber.java:29) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.JmsSession.createDurableSubscriber(JmsSession.java:571) ~[qpid-jms-client-0.53.0.jar:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.jms.connection.CachingConnectionFactory$CachedSessionInvocationHandler.invoke(CachingConnectionFactory.java:412) ~[spring-jms-5.3.31.jar:5.3.31]
at com.sun.proxy.$Proxy237.createDurableSubscriber(Unknown Source) ~[na:na]
at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:863) ~[spring-jms-5.3.31.jar:5.3.31]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:225) ~[spring-jms-5.3.31.jar:5.3.31]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1264) ~[spring-jms-5.3.31.jar:5.3.31]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1236) ~[spring-jms-5.3.31.jar:5.3.31]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) ~[spring-jms-5.3.31.jar:5.3.31]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) ~[spring-jms-5.3.31.jar:5.3.31]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.qpid.jms.provider.ProviderException: It is not possible for an entity that requires sessions to create a non-sessionful message receiver.
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToNonFatalException(AmqpSupport.java:181) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.getOpenAbortExceptionFromRemote(AmqpResourceBuilder.java:299) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:185) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:129) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:985) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:871) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:563) ~[qpid-jms-client-0.53.0.jar:na]
at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:556) ~[qpid-jms-client-0.53.0.jar:na]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1383) ~[netty-handler-4.1.92.Final.jar:4.1.92.Final]
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1246) ~[netty-handler-4.1.92.Final.jar:4.1.92.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1295) ~[netty-handler-4.1.92.Final.jar:4.1.92.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.92.Final.jar:4.1.92.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.92.Final.jar:4.1.92.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.92.Final.jar:4.1.92.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.92.Final.jar:4.1.92.Final]
... 1 common frames omitted
this warning comes before code throws an exception :
2023-12-12 01:34:40.868 WARN 72600 --- [ntContainer#0-2] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'topic_name' - trying to recover. Cause: It is not possible for an entity that requires sessions to create a non-sessionful message receiver.
external maven dependencies I am using :
<!-- https://mvnrepository.com/artifact/com.azure.spring/spring-cloud-azure-starter-servicebus-jms -->
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-servicebus-jms</artifactId>
<version>4.13.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.3.31</version>
</dependency>
I looked for the different approach using Azure Service Bus SDK directly, I have taken
ServiceBusSenderClient
to send messages with a session ID.Then again I followed the old repro by Re-configuring JMS Listener as below.