spring integeration amqp when convert message from amqp occur err, it does not go retry

92 views Asked by At

spring integeration amqp version: 5.0.11

goal: when occur fatal exceptions, message will drop. but not fatal, messages will requeue and go retry policy.

but in my case, i have a custom message converter, when my convert occur some non-fatal err, it will requeue always and never go to the retry policy.

i try to read code, AmqpInboundChannelAdapter.Listener#onMessage when before retry, it convert message, it means that when message convert occur some err, it does not go to retry, will go error handler.

public void onMessage(final Message message, final Channel channel) throws Exception {
        boolean retryDisabled = AmqpInboundChannelAdapter.this.retryTemplate == null;
        try {
            if (retryDisabled) {
                createAndSend(message, channel);
            }
            else {
                 final org.springframework.messaging.Message<Object> toSend = createMessage(message, channel); 
                AmqpInboundChannelAdapter.this.retryTemplate.execute(context -> {
                            StaticMessageHeaderAccessor.getDeliveryAttempt(toSend).incrementAndGet();
                            setAttributesIfNecessary(message, toSend);
                            sendMessage(toSend);
                            return null;
                        },
                        (RecoveryCallback<Object>) AmqpInboundChannelAdapter.this.recoveryCallback);
            }
        }

my code below:

  @Bean
public IntegrationFlow EndpointMessageAndConvertModelDlxFlow(
    @Qualifier("rabbitmqUnFatalExceptionRetryTemplate") RetryTemplate template,
    ConnectionFactory factory,
    EndpointCodeDelegatingMessageConverter converter) {
    final MailRabbitmqProperties.Queue queue = getQueueConfig(ENDPOINT_BUFFER_DLX_NODE,
        ENDPOINT_BUFFER_FUNCTION);
    template.setRetryPolicy(endpointMessageExceptionClassifierRetryPolicy()); //fatal err not go retry
    return IntegrationFlows.from(Amqp.inboundAdapter(factory, queue.getName())
        .configureContainer(smlc -> {
            smlc.acknowledgeMode(AcknowledgeMode.AUTO); //
            smlc.defaultRequeueRejected(true); //requeue
            final ConditionalRejectingErrorHandler errorHandler =
                new ConditionalRejectingErrorHandler(
                    new EndPointMessageFatalExceptionStrategy());
            smlc.errorHandler(errorHandler);
        })
        .retryTemplate(template) 
        .messageConverter(converter))
        .channel(mailActionCreateTopicChannel())
        .get();
}

how can i resolve this problem, thanks.

1

There are 1 answers

0
Artem Bilan On

Well, we believe that converter error is really fatal. That's why the conversion from AMQP message is really done outside of the retry loop.

If you are sure that error in your converter is intermittent, consider to include a retry logic into your converter, so when an AmqpInboundChannelAdapter calls this.converter.fromMessage(message), your RetryTemplate is going to be applied if an exception occurs.

See ProxyFactoryBean and RetryInterceptorBuilder if you use one of the out-of-the-box converters. Otherwise a @Retryable can be applied to the convert() method.

See more info in the Spring Retry project: https://docs.spring.io/spring-batch/docs/current/reference/html/retry.html