How to enable session based JMS-listeners for azure-service-bus?

250 views Asked by At

I am trying to integrate azure-service-bus into my java spring-project.

My azure-service-bus subscription has sessions enabled.

I am using org.springframework.jms.annotation.JmsListener to setup a listener.

PROBLEM :

  • NOT able to create a listener that listens to a session-enabled subscription.
  • Not able to push messages to the topic also, they are going to dead-letter-queue. (with reason : DeadLetterReason -> Session id is null., DeadLetterErrorDescription -> Session enabled entity doesn't allow a message whose session identifier is null.)

QUESTION :

  • How to enable session enabled message pushing and pulling in spring, for azure-service bus

sample code :

private final JmsTemplate jmsTemplate;
public void sendMessage() {
      jmsTemplate.convertAndSend(TOPIC_NAME, "sample_data");
}

@JmsListener(destination = TOPIC_NAME, subscription = SUBSCRIPTION_NAME, containerFactory = "topicJmsListenerContainerFactory")
    public void receiveMessage(Object data) throws JMSException {
//logic to process message
}

FYI : The code is working as expected for non-session enabled topics and subcriptions.

Error I keep on getting :

javax.jms.JMSException: It is not possible for an entity that requires sessions to create a non-sessionful message receiver. TrackingId:some_uuid, SystemTracker:some_uuid:subscription_name, Timestamp:2023-12-11T20:04:41 TrackingId:some_uuid, SystemTracker:some_uuid, Timestamp:2023-12-11T20:04:41 [condition = amqp:not-allowed]
    at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:698) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.JmsMessageConsumer.<init>(JmsMessageConsumer.java:125) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.JmsTopicSubscriber.<init>(JmsTopicSubscriber.java:36) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.JmsDurableTopicSubscriber.<init>(JmsDurableTopicSubscriber.java:29) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.JmsSession.createDurableSubscriber(JmsSession.java:571) ~[qpid-jms-client-0.53.0.jar:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.jms.connection.CachingConnectionFactory$CachedSessionInvocationHandler.invoke(CachingConnectionFactory.java:412) ~[spring-jms-5.3.31.jar:5.3.31]
    at com.sun.proxy.$Proxy237.createDurableSubscriber(Unknown Source) ~[na:na]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:863) ~[spring-jms-5.3.31.jar:5.3.31]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:225) ~[spring-jms-5.3.31.jar:5.3.31]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1264) ~[spring-jms-5.3.31.jar:5.3.31]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1236) ~[spring-jms-5.3.31.jar:5.3.31]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) ~[spring-jms-5.3.31.jar:5.3.31]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) ~[spring-jms-5.3.31.jar:5.3.31]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.qpid.jms.provider.ProviderException: It is not possible for an entity that requires sessions to create a non-sessionful message receiver. 
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToNonFatalException(AmqpSupport.java:181) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.getOpenAbortExceptionFromRemote(AmqpResourceBuilder.java:299) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:185) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:129) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:985) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:871) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:563) ~[qpid-jms-client-0.53.0.jar:na]
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:556) ~[qpid-jms-client-0.53.0.jar:na]
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1383) ~[netty-handler-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1246) ~[netty-handler-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1295) ~[netty-handler-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.92.Final.jar:4.1.92.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.92.Final.jar:4.1.92.Final]
    ... 1 common frames omitted

this warning comes before code throws an exception :

2023-12-12 01:34:40.868 WARN 72600 --- [ntContainer#0-2] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'topic_name' - trying to recover. Cause: It is not possible for an entity that requires sessions to create a non-sessionful message receiver.

external maven dependencies I am using :

<!-- https://mvnrepository.com/artifact/com.azure.spring/spring-cloud-azure-starter-servicebus-jms -->
        <dependency>
            <groupId>com.azure.spring</groupId>
            <artifactId>spring-cloud-azure-starter-servicebus-jms</artifactId>
            <version>4.13.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>5.3.31</version>
        </dependency>
1

There are 1 answers

3
Suresh Chikkam On

I looked for the different approach using Azure Service Bus SDK directly, I have taken ServiceBusSenderClient to send messages with a session ID.

import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusSenderClient;

public class MessageSender {

    private final ServiceBusSenderClient sender;

    public MessageSender(String connectionString, String topicName) {
        this.sender = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .sender()
                .topicName(topicName)
                .buildSenderClient();
    }

    public void sendMessageWithSessionId(String sessionId, String message) {
        sender.sendMessage(sender.createMessage(message).setSessionId(sessionId));
    }
}
  • But I couldn't enable the session message in my subscription and got the same error. enter image description here

Then again I followed the old repro by Re-configuring JMS Listener as below.

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class JmsConfig {

    @Bean
    public DefaultJmsListenerContainerFactory topicJmsListenerContainerFactory(
            DefaultJmsListenerContainerFactoryConfigurer configurer,
            JmsTemplate jmsTemplate) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, jmsTemplate.getConnectionFactory());
        factory.setSessionTransacted(true); // Enable session transactions if needed
        factory.setSubscriptionDurable(true); // Enable durable subscriptions if needed
        return factory;
    }
}
  • When sending messages to a topic, ensure that you set the session ID property.
public class MessageSender {

    private final JmsTemplate jmsTemplate;

    public MessageSender(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void sendMessageWithSessionId(String sessionId, String message) {
        jmsTemplate.send("yourTopicName", session -> {
            javax.jms.Message jmsMessage = session.createTextMessage(message);

            // Set the session ID property
            jmsMessage.setStringProperty("JMSXGroupID", sessionId);

            return jmsMessage;
        });
    }
}
  • In your listener method, check whether handle messages with the correct object type or not.

enter image description here

enter image description here