I am using the below XML snippet:
<int-amqp:inbound-channel-adapter acknowledge-mode="MANUAL" channel="commandQueue" concurrent-consumers="${commandConsumers:10}"
queue-names="commands" connection-factory="connectionFactory"/>
<int:channel id="commandQueue"/>
<int:channel id="commands"/>
<int:chain input-channel="commandQueue" output-channel="commands">
<int:delayer id="commandDelayer" default-delay="30000"/>
<int:json-to-object-transformer id="commandTransformer" type="com.airwatch.chat.command.Command"/>
</int:chain>
<int:payload-type-router input-channel="commands">
....
....
It's performing these tasks:
- Consume messages from the RabbitMQ queue named 'commands'.
- Delay the message execution by 30 seconds.
- Proceed with further execution for message after the specified delay.
If the message is already present on the commands queue before the application with the above code is started, on startup the application executes the message twice in separate threads.
I think I know why this is happening.
Spring reschedules the messages persisted in the message store of DelayHandler once the application context is completely initialized. Refer to the below code snippet from DelayHandler.java
:
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!this.initialized.getAndSet(true)) {
this.reschedulePersistedMessages();
}
}
So if the message was already present in the RabbitMQ queue before application startup, during Spring context initialization the message is picked up from the queue and added to the message store of DelayHandler. Once the context initialization is done, and if in the meantime the message is not released from the message store, the above code snippet reschedules the same message.
Now when two separate threads are executing the same message, if one thread has executed, then the message should be deleted from the message store and the other thread should not proceed with execution.
While the thread is executed, the below piece of code from DelayHandler.java
allows the second thread to release duplicate message, resulting in duplicate execution for the same message, as the message store is an instance of SimpleMessageStore and there is no further check to stop the execution.
private void doReleaseMessage(Message<?> message) {
if (this.messageStore instanceof SimpleMessageStore
|| ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null) {
this.messageStore.removeMessageFromGroup(this.messageGroupId, message);
this.handleMessageInternal(message);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("No message in the Message Store to release: " + message +
". Likely another instance has already released it.");
}
}
}
Is this a bug in Spring Integration?
Oh, well!
That's really sweet bug.
Thank you for pointing that out!
Please, raise a JIRA issue and we'll take about that in the next release.
I can explain what's going on.
All Spring Integration starts their work from
Lifecycle.start()
. In your case<int-amqp:inbound-channel-adapter>
receives message from the RabbitMQ and sends it to the integration flow. And they aredelayed
.And only after
start
the application context raisesContextRefreshedEvent
. Catching that evenDelayHandler
picks up all messages from themessageStore
and, as you niticied,reschedules
them.Hence, yes, we may have two scheduled tasks for the same message.
What is funny it is only for the
SimpleMessageStore
, because it doesn't haveremoveMessage
function for messages which are stored togroups
.I see several variants as a workaround:
Delay the
start
for<int-amqp:inbound-channel-adapter>
. For example, handle the sameContextRefreshedEvent
from the<inbound-channel-adapter>
and send@amqpAdapter.start()
command message to the<control-bus>
Another option is available since Spring Integration 4.1 and its name is
Idempotent Receiver
. Using that you can discard theduplicate
message and I guess theidempotentKey
is exactlymessageId
. Clean Idempotent Receiver pattern!And one more option is lying under
persistent
MessageStore
, where we really can rely onremoveMessage
operation.The JIRA ticket on the matter: https://jira.spring.io/browse/INT-3560