How to send basicAck to inbound adapter after publisher confirm from outbound adapter

749 views Asked by At

We have an inbound channel adapter that receives notifications of an event. The complexity of the consumer's criteria restrict our ability to use a simple routing key to distribute the messages, so the application uses a splitter to send that message to interested subscriber's queues via a direct exchange.

We want to use publisher confirms on our outbound channel adapter the ensure delivery to the client queues. We want to wait for the publisher confirm to ack the original message, and if a publisher confirm fails to be received or if the ack==false we want to nack the original message that came from the inbound channel adapter.

I assume this will be done in the confirm-callback from the Rabbit Template but I am not sure how to accomplish this. (Or if it is even possible)

<rabbit:connection-factory id="rabbitConnectionFactory" 
        host="${amqpHost}"
        username="${amqpUsername}"
        password="${amqpPassword}"
        virtual-host="${amqpVirtualHost}" 
        publisher-confirms="true" />

<rabbit:template id="rabbitTemplate" 
        connection-factory="rabbitConnectionFactory"
        confirm-callback="PublisherConfirms"    />

<int-amqp:inbound-channel-adapter channel="notificationsFromRabbit" 
        queue-names="#{'${productNotificationQueue}' + '${queueSuffix}'}"
        connection-factory="rabbitConnectionFactory"
        mapped-request-headers="*"
        message-converter="productNotificationMessageConverter"  />

<int:chain input-channel="notificationsFromRabbit" output-channel="notificationsToClients">
        <int:service-activator ref="NotificationRouter" 
                      method="addRecipientsHeaders" />
        <int:splitter ref="NotificationRouter" 
                      method="groupMessages" />
        <int:object-to-json-transformer />
</int:chain>

<int-amqp:outbound-channel-adapter channel="notificationsToClients"
        amqp-template="rabbitTemplate"
        exchange-name="${servicesClientsExchange}"
        routing-key=""
        mapped-request-headers="*"  />

At the moment we are acking the messages in the groupMessages method by passing the Channel and Delivery tag as paramters. But, if the broker never sends a return or returns with ack=false then it is too late to nack the message from the inbound channel adapter.

Am I going to need a bean that keeps a Map<Channel, Long> of the channel and delivery tags to access in the confirm-callback or is there some other way?

Is the channel from the inbound channel adapter going to be closed by the time I receive a publisher confirm?

1

There are 1 answers

3
Gary Russell On BEST ANSWER

As long as you suspend the consumer thread until all the acks/nacks have been received, you can do what you want.

If you make notificationsFromRabbit a publish-subscribe channel you can add another subscriber (service-activator) where you suspend the thread; wait for all the acks/nacks and take the action you desire.

EDIT:

You can also use Spring Integration to manage the acks for you and it will emit them as messages from the outbound adapter (rather than using a callback yourself).

EDIT2:

You could then use the splitter's sequence size/sequence number headers in your correlation data, enabling the release of the consumer when all the acks are received.

EDIT3:

Something like this should work...

On the outbound adapter, set confirm-correlation-expression="#this" (the whole outbound message).

Class with two methods

private final Map<String, BlockingQueue<Boolean> suspenders;

public void suspend(Message<?> original) {
    BlockingQueue<Boolean> bq = new LinkedBlockingQueue();
    String key = someKeyFromOriginal(original);
    suspenders.put(key, bq);
    Boolean result = bq.poll(// some timeout);
    try {
        if (result == null) {
            // timed out
        } 
        else if (!result) {
            // throw some exception to nack the message
        }
    }
    finally {
        suspenders.remove(key);
    }
}

public void ackNack(Message<Message<?>> ackNak) {
    Message<?> theOutbound = ackNak.payload;
    BlockingQueue<Boolean> bq = suspenders.get(someKeyFromOriginal(theOutbound));
    if (bq == null) // late ack/nack; ignore
    else {
        // check the ack/nack header
        // if nack, bq.put(false)
        // else, use another map field, to 
        // keep track of ack count Vs sequenceSize header in 
        // theOutbound; when all acks received, bq.put(true);
    }
}

Suspend the consumer thread in the first method; route the acks/nacks from the outbound adapter to the second method.

Caveat: This is not tested, just off the top of my head; but it should be pretty close.