spring integeration amqp version: 5.0.11
goal: when occur fatal exceptions, message will drop. but not fatal, messages will requeue and go retry policy.
but in my case, i have a custom message converter, when my convert occur some non-fatal err, it will requeue always and never go to the retry policy.
i try to read code, AmqpInboundChannelAdapter.Listener#onMessage
when before retry, it convert message, it means that when message convert occur some err, it does not go to retry, will go error handler.
public void onMessage(final Message message, final Channel channel) throws Exception {
boolean retryDisabled = AmqpInboundChannelAdapter.this.retryTemplate == null;
try {
if (retryDisabled) {
createAndSend(message, channel);
}
else {
final org.springframework.messaging.Message<Object> toSend = createMessage(message, channel);
AmqpInboundChannelAdapter.this.retryTemplate.execute(context -> {
StaticMessageHeaderAccessor.getDeliveryAttempt(toSend).incrementAndGet();
setAttributesIfNecessary(message, toSend);
sendMessage(toSend);
return null;
},
(RecoveryCallback<Object>) AmqpInboundChannelAdapter.this.recoveryCallback);
}
}
my code below:
@Bean
public IntegrationFlow EndpointMessageAndConvertModelDlxFlow(
@Qualifier("rabbitmqUnFatalExceptionRetryTemplate") RetryTemplate template,
ConnectionFactory factory,
EndpointCodeDelegatingMessageConverter converter) {
final MailRabbitmqProperties.Queue queue = getQueueConfig(ENDPOINT_BUFFER_DLX_NODE,
ENDPOINT_BUFFER_FUNCTION);
template.setRetryPolicy(endpointMessageExceptionClassifierRetryPolicy()); //fatal err not go retry
return IntegrationFlows.from(Amqp.inboundAdapter(factory, queue.getName())
.configureContainer(smlc -> {
smlc.acknowledgeMode(AcknowledgeMode.AUTO); //
smlc.defaultRequeueRejected(true); //requeue
final ConditionalRejectingErrorHandler errorHandler =
new ConditionalRejectingErrorHandler(
new EndPointMessageFatalExceptionStrategy());
smlc.errorHandler(errorHandler);
})
.retryTemplate(template)
.messageConverter(converter))
.channel(mailActionCreateTopicChannel())
.get();
}
how can i resolve this problem, thanks.
Well, we believe that converter error is really fatal. That's why the conversion from AMQP message is really done outside of the retry loop.
If you are sure that error in your converter is intermittent, consider to include a retry logic into your converter, so when an
AmqpInboundChannelAdapter
callsthis.converter.fromMessage(message)
, yourRetryTemplate
is going to be applied if an exception occurs.See
ProxyFactoryBean
andRetryInterceptorBuilder
if you use one of the out-of-the-box converters. Otherwise a@Retryable
can be applied to theconvert()
method.See more info in the Spring Retry project: https://docs.spring.io/spring-batch/docs/current/reference/html/retry.html