Spring Integration: Message released twice after delay

1.4k views Asked by At

I am using the below XML snippet:

<int-amqp:inbound-channel-adapter acknowledge-mode="MANUAL" channel="commandQueue" concurrent-consumers="${commandConsumers:10}"
                                  queue-names="commands" connection-factory="connectionFactory"/>
<int:channel id="commandQueue"/>
<int:channel id="commands"/>
<int:chain input-channel="commandQueue" output-channel="commands">
    <int:delayer id="commandDelayer" default-delay="30000"/>
    <int:json-to-object-transformer id="commandTransformer" type="com.airwatch.chat.command.Command"/>
</int:chain>

<int:payload-type-router input-channel="commands">
....
....

It's performing these tasks:

  1. Consume messages from the RabbitMQ queue named 'commands'.
  2. Delay the message execution by 30 seconds.
  3. Proceed with further execution for message after the specified delay.

If the message is already present on the commands queue before the application with the above code is started, on startup the application executes the message twice in separate threads.

I think I know why this is happening.

Spring reschedules the messages persisted in the message store of DelayHandler once the application context is completely initialized. Refer to the below code snippet from DelayHandler.java:

public void onApplicationEvent(ContextRefreshedEvent event) {
    if (!this.initialized.getAndSet(true)) {
        this.reschedulePersistedMessages();
    }
}

So if the message was already present in the RabbitMQ queue before application startup, during Spring context initialization the message is picked up from the queue and added to the message store of DelayHandler. Once the context initialization is done, and if in the meantime the message is not released from the message store, the above code snippet reschedules the same message.

Now when two separate threads are executing the same message, if one thread has executed, then the message should be deleted from the message store and the other thread should not proceed with execution.

While the thread is executed, the below piece of code from DelayHandler.java allows the second thread to release duplicate message, resulting in duplicate execution for the same message, as the message store is an instance of SimpleMessageStore and there is no further check to stop the execution.

private void doReleaseMessage(Message<?> message) {
    if (this.messageStore instanceof SimpleMessageStore
            || ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null) {
        this.messageStore.removeMessageFromGroup(this.messageGroupId, message);
        this.handleMessageInternal(message);
    }
    else {
        if (logger.isDebugEnabled()) {
            logger.debug("No message in the Message Store to release: " + message +
                    ". Likely another instance has already released it.");
        }
    }
}

Is this a bug in Spring Integration?

1

There are 1 answers

0
Artem Bilan On

Oh, well!

That's really sweet bug.

Thank you for pointing that out!

Please, raise a JIRA issue and we'll take about that in the next release.

I can explain what's going on.

All Spring Integration starts their work from Lifecycle.start(). In your case <int-amqp:inbound-channel-adapter> receives message from the RabbitMQ and sends it to the integration flow. And they are delayed.

And only after start the application context raises ContextRefreshedEvent. Catching that even DelayHandler picks up all messages from the messageStore and, as you niticied, reschedules them.

Hence, yes, we may have two scheduled tasks for the same message.

What is funny it is only for the SimpleMessageStore, because it doesn't have removeMessage function for messages which are stored to groups.

I see several variants as a workaround:

  1. Delay the start for <int-amqp:inbound-channel-adapter>. For example, handle the same ContextRefreshedEvent from the <inbound-channel-adapter> and send @amqpAdapter.start() command message to the <control-bus>

  2. Another option is available since Spring Integration 4.1 and its name is Idempotent Receiver. Using that you can discard the duplicate message and I guess the idempotentKey is exactly messageId. Clean Idempotent Receiver pattern!

  3. And one more option is lying under persistent MessageStore, where we really can rely on removeMessage operation.

The JIRA ticket on the matter: https://jira.spring.io/browse/INT-3560