I need to be able to receive notification when a ActiveMQ client consumes a MQTT message.
activemq.xml
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" advisoryForConsumed="true" />
</policyEntries>
</policyMap>
</destinationPolicy>
In the below code, I get MQTT messages on myTopic fine. I do not get advisory messages in processAdvisoryMessage / processAdvisoryBytesMessage.
@Component
public class MqttMessageListener {
@JmsListener(destination = "mytopic")
public void processMessage(BytesMessage message) {
}
@JmsListener(destination = "ActiveMQ.Advisory.MessageConsumed.Topic.>")
public void processAdvisoryMessage(Message message) {
System.out.println("processAdvisoryMessage Got a message");
}
@JmsListener(destination = "ActiveMQ.Advisory.MessageConsumed.Topic.>")
public void processAdvisoryBytesMessage(BytesMessage message) {
System.out.println("processAdvisoryBytesMessageGot a message");
}
}
What am I doing wrong?
I have also attempted doing this with a ActiveMQ BrokerFilter:
public class AMQMessageBrokerFilter extends GenericBrokerFilter {
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
super.acknowledge(consumerExchange, ack);
}
@Override
public void postProcessDispatch(MessageDispatch messageDispatch) {
Message message = messageDispatch.getMessage();
}
@Override
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
log.debug("messageDelivered called.");
super.messageDelivered(context, messageReference);
}
@Override
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
log.debug("messageConsumed called.");
super.messageConsumed(context, messageReference);
}
In this second scenario I was unable to both have the message and a contect with which to send the consumed notification. acknowledge/messageDelivered/messageConsumed all have a connection context but only postProcessDispatch has the message which I need part of it (payload is JSON) in order to send my outgoing message. I could be eager and use send which has both but it is safer to wait until at least it was acknowledged.
I have tried:
@Override
public void postProcessDispatch(MessageDispatch messageDispatch) {
super.postProcessDispatch(messageDispatch);
String topic = messageDispatch.getDestination().getPhysicalName();
if( topic == null || topic.equals("delivered") )
return;
try {
ActiveMQTopic responseTopic = new ActiveMQTopic("delivered");
ActiveMQTextMessage responseMsg = new ActiveMQTextMessage();
responseMsg.setPersistent(false);
responseMsg.setResponseRequired(false);
responseMsg.setProducerId(new ProducerId());
responseMsg.setText("Delivered msg: "+msg);
responseMsg.setDestination(responseTopic);
String messageKey = ":"+rand.nextLong();
MessageId msgId = new MessageId(messageKey);
responseMsg.setMessageId(msgId);
ProducerBrokerExchange producerExchange=new ProducerBrokerExchange();
ConnectionContext context = getAdminConnectionContext();
producerExchange.setConnectionContext(context);
producerExchange.setMutable(true);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
next.send(producerExchange, responseMsg);
}
catch (Exception e) {
log.debug("Exception: "+e);
}
However the above seems to lead to unstable server. I'm thinking this is related to using the getAdminConnectionContext which seems wrong.
My factory was setting setPubSubDomain to false by default. This disables connections for advisory messages for topics. I set it to true and things started working. Note that queues will not work with this set. To get around that I created two factories and named their beans.