Spring - ActiveMQ - Durable Subscription - Close Connection and Resubscribe to get the offline messages

1.2k views Asked by At

I want to implement a solution in Spring-JMS with activeMQ where I want to create a durable subscription to a topic. The purpose is that if a subscriber closes the subscription for a while and once again recreates the durablesubscription with same client id and subscription name, the subscriber should receive all the messages which were delivered during the time subscription was closed.

I want to implement the following logic mentioned in the ORACLE URL for durable subscriptions: https://docs.oracle.com/cd/E19798-01/821-1841/bncgd/index.html

enter image description here

But I am unable to perform this using spring-jms. As per the URL I need to get messageConsumer instance and call close() on that method to stop receiving message temporarily from the topic. But I am not sure how to get it.

Following is my configuration. Kindly let me know how to modify the configuration to perform this.

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">


<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
    p:userName="admin"
    p:password="admin" 
    p:brokerURL="tcp://127.0.0.1:61616"
    primary="true"
    ></bean>

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" p:durableSubscriptionName="gxaa-durable1" p:clientId="gxaa-client1">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="adiTopic"/>
    <property name="messageListener" ref="adiListener"/>
</bean>

<bean id="configTemplate" class="org.springframework.jms.core.JmsTemplate" 
        p:connectionFactory-ref="connectionFactory"
        p:defaultDestination-ref="adiTopic" primary="true"
        p:pubSubDomain="true">
</bean>

<bean id="adiTopic" class="org.apache.activemq.command.ActiveMQTopic" p:physicalName="gcaa.adi.topic"></bean>

<bean id="adiListener" class="com.gcaa.asset.manager.impl.AdiListener"></bean>

1

There are 1 answers

2
Hassen Bennour On BEST ANSWER

why not calling DefaultMessageListenerContainer.stop(); to stop the container and consumers ?

you can inject jmsContainer to another bean and close it when you want and call start() later.

all messages sent to the broker when your durable consumer is offline will be stored until it reconnect.

to make subscription durables you need to add this to jmsContainer bean

    <property name="subscriptionDurable" value="true" />
    <property name="cacheLevel" value="1" />

you can add a subscriptionName or the class name of the specified message listener will be used.

You can add a clientID to the connectionFactory

    <property name="clientID" value="${jms.clientId}" />

or use

<bean class="org.springframework.jms.connection.SingleConnectionFactory" id="singleConnectionFactory"> <constructor-arg ref="connectionFactory" /> <property name="reconnectOnException" value="true" /> <property name="clientId" value="${jms.clientId}" /> </bean>

and update jmsContainer

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" p:durableSubscriptionName="gxaa-durable1" p:clientId="gxaa-client1"> <property name="connectionFactory" ref="singleConnectionFactory" /> <property name="destination" ref="adiTopic" /> <property name="messageListener" ref="adiListener" /> <property name="subscriptionDurable" value="true" /> <property name="cacheLevel" value="1" /> </bean>

UPDATE :

if your adiListener implements org.springframework.jms.listener.SessionAwareMessageListener it have to define method onMessage(M message, Session session) and when you have the session you can call javax.jms.Session.unsubscribe(String subscriptionName)

subscriptionName is defined above and can be injected to this bean or the class name of the specified message listener can be used.