Delayed delivery with ActiveMQ 5.3 and NMS

1.6k views Asked by At

I'm attempting to use the new delayed delivery functionality from NMS.

The schedulerSupport attribute has been set in the config file, and I'm using the following code to attempt to delay delivery of a message until the date/time chosen by the user is reached.

The code (which does not seem to be working currently) is as follows:

var timeDelay = dateTimePicker.Value.Subtract(DateTime.Now).TotalMilliseconds;     
var message = topicPublisher.CreateTextMessage();
message.Properties["AMQ_SCHEDULED_DELAY"] = timeDelay;
message.Text = CM.ToXMLString();

topicPublisher.Send(message);

Can you point out what might be incorrect within this example?

Many thanks!

2

There are 2 answers

2
Tim Bish On BEST ANSWER

I don't see anything obvious from the code provided.

You could try turning up the logging in the broker to see if the scheduler receives the message and that the values are correct, that would also confirm that you have indeed enabled scheduler support. You could also try creating a small java program that does something similar to determine if the NMS client is behaving correctly.

I assume you have a consumer running and its connection object has been started?

Regards Tim.

www.fusesource.com

1
rvxnet On

Thanks for your help with this Tim. I have found the cause of the issue. Slightly delayed response on my part - I had to concentrate on other areas of work, but I have today managed to come back to this.

The issue is the C# ".TotalMilliseconds" function - this returns a fractional total.partial milliseconds value, which it is apparent that ActiveMQ is not fond of.

Now I'm converting this millisecond value to an integer, the delayed messaging is working as required.

Log Excerpt Below:

2011-03-07 10:14:44,186 | DEBUG | quasar adding destination: topic://REDACTED | org.apache.activemq.broker.region.AbstractRegion | ActiveMQ Transport: tcp:///REDACTED:50161
2011-03-07 10:14:44,576 | DEBUG | Error occured while processing async command: ActiveMQTextMessage {commandId = 4, responseRequired = false, messageId = ID:HL003323-50159-634350896828327757-0:0:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:HL003323-50159-634350896828327757-0:0:1:1, destination = topic://REDACTED, transactionId = null, expiration = 0, timestamp = 1299492884437, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@1ae0436, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {targetModule=HL.Services.Blackbird.OrderManager, AMQ_SCHEDULED_DELAY=53564.4233}, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = <?xml version="1.0" encoding="utf-16"?>
<Com...mandMessage>}, exception: java.lang.NullPointerException | org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport: tcp:///REDACTED:50161
java.lang.NullPointerException
        at org.apache.activemq.broker.scheduler.SchedulerBroker.send(SchedulerBroker.java:179)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
        at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:227)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:192)
        at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
        at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:462)
        at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:677)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:311)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
        at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:228)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
        at java.lang.Thread.run(Thread.java:595)
2011-03-07 10:14:44,577 | WARN  | Async error occurred: java.lang.NullPointerException | org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport: tcp:///REDACTED:50161
java.lang.NullPointerException
        at org.apache.activemq.broker.scheduler.SchedulerBroker.send(SchedulerBroker.java:179)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
        at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:227)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:192)
        at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
        at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:462)
        at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:677)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:311)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
        at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:228)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
        at java.lang.Thread.run(Thread.java:595)