We have an inbound channel adapter that receives notifications of an event. The complexity of the consumer's criteria restrict our ability to use a simple routing key to distribute the messages, so the application uses a splitter to send that message to interested subscriber's queues via a direct exchange.
We want to use publisher confirms on our outbound channel adapter the ensure delivery to the client queues. We want to wait for the publisher confirm to ack
the original message, and if a publisher confirm fails to be received or if the ack==false
we want to nack the original message that came from the inbound channel adapter.
I assume this will be done in the confirm-callback
from the Rabbit Template but I am not sure how to accomplish this. (Or if it is even possible)
<rabbit:connection-factory id="rabbitConnectionFactory"
host="${amqpHost}"
username="${amqpUsername}"
password="${amqpPassword}"
virtual-host="${amqpVirtualHost}"
publisher-confirms="true" />
<rabbit:template id="rabbitTemplate"
connection-factory="rabbitConnectionFactory"
confirm-callback="PublisherConfirms" />
<int-amqp:inbound-channel-adapter channel="notificationsFromRabbit"
queue-names="#{'${productNotificationQueue}' + '${queueSuffix}'}"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="*"
message-converter="productNotificationMessageConverter" />
<int:chain input-channel="notificationsFromRabbit" output-channel="notificationsToClients">
<int:service-activator ref="NotificationRouter"
method="addRecipientsHeaders" />
<int:splitter ref="NotificationRouter"
method="groupMessages" />
<int:object-to-json-transformer />
</int:chain>
<int-amqp:outbound-channel-adapter channel="notificationsToClients"
amqp-template="rabbitTemplate"
exchange-name="${servicesClientsExchange}"
routing-key=""
mapped-request-headers="*" />
At the moment we are acking
the messages in the groupMessages
method by passing the Channel and Delivery tag as paramters. But, if the broker never sends a return
or returns with ack=false
then it is too late to nack
the message from the inbound channel adapter.
Am I going to need a bean that keeps a Map<Channel, Long>
of the channel and delivery tags to access in the confirm-callback
or is there some other way?
Is the channel from the inbound channel adapter going to be closed by the time I receive a publisher confirm?
As long as you suspend the consumer thread until all the acks/nacks have been received, you can do what you want.
If you make
notificationsFromRabbit
a publish-subscribe channel you can add another subscriber (service-activator) where you suspend the thread; wait for all the acks/nacks and take the action you desire.EDIT:
You can also use Spring Integration to manage the acks for you and it will emit them as messages from the outbound adapter (rather than using a callback yourself).
EDIT2:
You could then use the splitter's sequence size/sequence number headers in your correlation data, enabling the release of the consumer when all the acks are received.
EDIT3:
Something like this should work...
On the outbound adapter, set
confirm-correlation-expression="#this"
(the whole outbound message).Class with two methods
Suspend the consumer thread in the first method; route the acks/nacks from the outbound adapter to the second method.
Caveat: This is not tested, just off the top of my head; but it should be pretty close.