Mule AMQP connector fails trying to requeue message

1.5k views Asked by At

I'm using the AMQP endpoint and have got it working (with one caveat mentioned here [Possible bug with Mule AMQP transport 3.6.2 community) in other places in my Mule application.

I'm trying to build a reliable SMTP queue but having problems.

I made the assumption that if the SMTP connector couldn't perform, it would throw an exception, so by setting message acknowledgement to MANUAL, and catching SMTP exceptions and rejecting (and requeuing) the message in the exception strategy the system should deal with SMTP outages.

My code (slightly simplified) looks like this:

<amqp:connector name="connector.amqp.mule.default" ackMode="MANUAL" doc:name="AMQP Connector" validateConnections="true" host="${amiab-rabbitmq-hostname}" port="${amiab-rabbitmq-portnumber}" password="${amiab-rabbitmq-password}" username="${amiab-rabbitmq-username}"/>
<smtp:connector name="smtpConnector" contentType="text/html" doc:name="SMTPConnector"/>
<flow name="utility-smtpFlow">
  <amqp:inbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" queueName="utilitySMTP" exchangeType="direct"  responseTimeout="10000"  doc:name="AMQP-0-9" queueDurable="true"/>
    <logger message="Recevied SMTP Message from Queue" level="INFO" doc:name="Logger"/>
    <smtp:outbound-endpoint host="${smtp-hostname}" user="${smtp-username}" password="{smtp-password}" to="#[flowVars.smtpTo]" subject="Testing Message" responseTimeout="10000" doc:name="SMTP"/>
    <amqp:acknowledge-message doc:name="AMQP-0-9 Acknowledge Message"/>
    <catch-exception-strategy doc:name="Catch Exception Strategy">
        <logger message="SMTP Error, Requeuing" level="INFO" doc:name="Logger"/>
        <amqp:reject-message requeue="true" doc:name="AMQP-0-9 Reject Message"/>
    </catch-exception-strategy>

When my SMTP connector throws an exception, the exception strategy is correctly instigated, but then AMQP throws an error as shown below:

INFO  2015-06-18 16:32:22,383 [HTTP_Listener_AMIAB.worker.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default outbound transformer: org.mule.transport.amqp.internal.transformer.ObjectToAmqpMessage
INFO  2015-06-18 16:32:22,383 [HTTP_Listener_AMIAB.worker.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default response transformer: org.mule.transport.amqp.internal.transformer.ObjectToAmqpMessage
INFO  2015-06-18 16:32:22,387 [HTTP_Listener_AMIAB.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'connector.amqp.mule.default.dispatcher.936721322'. Object is: Dispatcher
INFO  2015-06-18 16:32:22,389 [HTTP_Listener_AMIAB.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'connector.amqp.mule.default.dispatcher.936721322'. Object is: Dispatcher
INFO  2015-06-18 16:32:22,446 [[amiab-esb-utility].utility-smtpFlow.stage1.02] org.mule.api.processor.LoggerMessageProcessor: Recevied SMTP Message from Queue
INFO  2015-06-18 16:32:22,457 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default outbound transformer: org.mule.transport.email.transformers.ObjectToMimeMessage
INFO  2015-06-18 16:32:22,481 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'smtpConnector.dispatcher.404056326'. Object is: SmtpMessageDispatcher
ERROR 2015-06-18 16:32:22,568 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.exception.CatchMessagingExceptionStrategy: 
********************************************************************************
Message               : Unable to connect to mail transport.
Code                  : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. mail.lyrical.co.uk (java.net.UnknownHostException)
  java.net.AbstractPlainSocketImpl:178 (null)
2. Unknown SMTP host: mail.lyrical.co.uk (javax.mail.MessagingException)
  com.sun.mail.smtp.SMTPTransport:1704 (http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/mail/MessagingException.html)
3. Unable to connect to mail transport. (org.mule.api.endpoint.EndpointException)
  org.mule.transport.email.SmtpMessageDispatcher:67 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/endpoint/EndpointException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.net.UnknownHostException: mail.lyrical.co.uk
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    + 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

INFO  2015-06-17 15:25:47,190 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.api.processor.LoggerMessageProcessor: SMTP Error, Requeuing
WARN  2015-06-17 15:25:47,193 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MessageReceiverConsumer: Received shutdown signal for consumer tag: amq.ctag-NZ96mnj_oscnpJKAPy16ng, the message receiver will try to restart.
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=90)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) ~[?:?]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) ~[?:?]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[?:?]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[?:?]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550) ~[?:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.7.0_75]
INFO  2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.lifecycle.AbstractLifecycleManager: Stopping: 'null'. Object is: MultiChannelMessageSubReceiver
INFO  2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Connecting clusterizable message receiver
INFO  2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'null'. Object is: MultiChannelMessageSubReceiver
INFO  2015-06-17 15:25:47,196 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Starting clusterizable message receiver
INFO  2015-06-17 15:25:47,203 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Started subscription: amq.ctag-sCCRCnOWKyREyip26pSYDA on channel: AMQChannel(amqp://[email protected]:5672/,3)

Have I missed something obvious, or am I misunderstanding how this should be used? Very grateful if anyone can point me in the right direction.

1

There are 1 answers

1
David Dossot On BEST ANSWER

As you have noted in your comment, the smtp:outbound-endpoint being a one-way endpoint, its dispatch operation is performed in another thread, thus it occurs in parallel with the execution of the flow. Therefore, amqp:acknowledge-message gets called anyway, independently of the success or failure of the smtp:outbound-endpoint operation.

You can get around this issue by setting processingStrategy="synchronous" on the flow: this will force all the endpoint interactions to be performed on the inbound thread.

Shameless plug: this is discussed in details in chapter 11.2 of Mule in Action, Second Edition.