ActiveMQ JMSListener

1.7k views Asked by At

I'm new to this topic. I'm Using JMS listener to listen to an Active MQ which contains split messages. I need to listen to the queue till last message and then send it to UI all together. I'm able to listen to the queue and grab messages but I do not know how many split messages will be available so i'm not able to send it all together. Is there any way to make listener to do the above operation? Like if there is no more messages available in queue, will jms listener produces a null value? Any idea or help will be really helpful.

I'm Using the below code to listen to Queue using JMS Listener.

 private static final String ORDER_RESPONSE_QUEUE = "mail-response-queue";

@JmsListener(destination = ORDER_RESPONSE_QUEUE)
public void receiveMessage(final Message<InventoryResponse> message) throws JMSException {
    LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
    MessageHeaders headers =  message.getHeaders();
    LOG.info("Application : headers received : {}", headers);

    InventoryResponse response = message.getPayload();
    LOG.info("Application : response received : {}",response);  
    LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
}

Can i get Queue information using JMS Listener?

1

There are 1 answers

2
Hassen Bennour On

by jmx you have access to informations about destinations, like this for example you can know how messages are pending in a Queue.

Note that this can change if new messages are sent

long org.apache.activemq.broker.jmx.DestinationViewMBean.getQueueSize()

@MBeanInfo(value="Number of messages in the destination which are yet to be consumed. Potentially dispatched but unacknowledged.")

Returns the number of messages in this destination which are yet to be consumed Returns:Returns the number of messages in this destination which are yet to be consumed

import java.util.HashMap;
import java.util.Map;

import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;

public class JMXGetDestinationInfos {

    public static void main(String[] args) throws Exception {
        JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://host:1099/jmxrmi");
        Map<String, String[]> env = new HashMap<>();
        String[] creds = {"admin", "activemq"};
        env.put(JMXConnector.CREDENTIALS, creds);
        JMXConnector jmxc = JMXConnectorFactory.connect(url, env);
        MBeanServerConnection conn = jmxc.getMBeanServerConnection();

        ObjectName activeMq = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");

        BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class,
                true);
        for (ObjectName name : mbean.getQueues()) {
            if (("Destination".equals(name.getKeyProperty("destinationName")))) {
                QueueViewMBean queueMbean = MBeanServerInvocationHandler.newProxyInstance(conn, name,
                        QueueViewMBean.class, true);
                System.out.println(queueMbean.getQueueSize());
            }
        }
    }
}

Why not consuming messages and when there is no messages received you display ?? You have method below which returns null after a timeout if there no messages received.

ActiveMQMessageConsumer.receive(long timeout) throws JMSException Receives the next message that arrives within the specified timeout interval. This call blocks until a message arrives, the timeout expires, or this message consumer is closed. A timeout of zero never expires, and the call blocks indefinitely. Specified by: receive in interface MessageConsumer Parameters: timeout - the timeout value (in milliseconds), a time out of zero never expires. Returns: the next message produced for this message consumer, or null if the timeout expires or this message consumer is concurrently closed

UPDATE

may be like this :

import java.io.IOException;
import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.Map;

import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;

public class JMXGetDestinationInfos {

    private QueueViewMBean queueMbean;

    {
        try {
            JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://host:1099/jmxrmi");
            Map<String, String[]> env = new HashMap<>();
            String[] creds = { "admin", "activemq" };
            env.put(JMXConnector.CREDENTIALS, creds);
            JMXConnector jmxc = JMXConnectorFactory.connect(url, env);
            MBeanServerConnection conn = jmxc.getMBeanServerConnection();

            ObjectName activeMq = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");

            BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class,
                    true);
            for (ObjectName name : mbean.getQueues()) {
                if (("Destination".equals(name.getKeyProperty("destinationName")))) {
                    queueMbean = MBeanServerInvocationHandler.newProxyInstance(conn, name, QueueViewMBean.class, true);
                    System.out.println(queueMbean.getQueueSize());
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @JmsListener(destination = ORDER_RESPONSE_QUEUE)
    public void receiveMessage(final Message<InventoryResponse> message, javax.jms.Message amqMessage) throws JMSException {
        LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
        MessageHeaders headers =  message.getHeaders();
        LOG.info("Application : headers received : {}", headers);

        InventoryResponse response = message.getPayload();
        LOG.info("Application : response received : {}",response);  
        LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
        //queueMbean.getQueueSize()  is real time, each call return the real size
        ((org.apache.activemq.command.ActiveMQMessage) amqMessage ).acknowledge();
        if(queueMbean != null && queueMbean.getQueueSize() == 0){
            //display messages ??
        }
    }
}

because getQueueSize() return the umber of messages in the destination which are yet to be consumed. Potentially dispatched but unacknowledged.

One solution is to update the acknowledgeMode to org.apache.activemq.ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE for sessions creation in your spring DefaultMessageListenerContainer.sessionAcknowledgeModeName and acknowledge each message individually and check after that if the size == 0 (size == 0 means all messages are dispatched and acknowledged).