Park XML message in invalid format to AMQP parking lot queue

343 views Asked by At

Given I have IntegrationFlow

IntegrationFlows.from(
        Amqp.inboundAdapter(rabbitConnectionFactory, QUEUE)
                .messageConverter(new MarshallingMessageConverter(xmlMarshaller))
                .defaultRequeueRejected(false)
                .concurrentConsumers(2)
                .maxConcurrentConsumers(4)
                .channelTransacted(true)
                .errorHandler(new ConditionalRejectingErrorHandler())
)
        .log(INFO, AMQP_LOGGER_CATEGORY)
        .publishSubscribeChannel(s -> s
                .subscribe(f -> f
                        .handle(deathCheckHandler))
                .subscribe(f -> f.handle(service))
        )
        .get();

where deathCheckHandler is

@Component
public class DeathCheckHandler {

    private static final Logger logger = LoggerFactory.getLogger(lookup().lookupClass());

    private static final int RETRY_COUNT = 3;
    private final RabbitTemplate rabbitTemplate;
    private final Jaxb2Marshaller xmlMarshaller;

    public DeathCheckHandler(RabbitTemplate rabbitTemplate, Jaxb2Marshaller xmlMarshaller) {
        this.rabbitTemplate = rabbitTemplate;
        this.xmlMarshaller = xmlMarshaller;
    }

    @ServiceActivator
    public void check(Message<?> message) {
        MessageHeaders headers = message.getHeaders();

        Optional<XDeath> rejected = findAnyRejectedXDeathMessageHeader(headers);
        if (rejected.isPresent()) {
            int rejectedCount = rejected.get().getCount();
            logger.debug("Rejected count is {}", rejectedCount);
            if (rejectedCount > RETRY_COUNT) {
                parkMessage(message);
            }
        }
    }

    private void parkMessage(Message<?> message) {
        Object payload = message.getPayload();
        MessageHeaders headers = message.getHeaders();
        String parkingExchange = (String) headers.get("amqp_receivedExchange");
        String parkingRoutingKey = ((String) headers.get("amqp_consumerQueue")).replace("queue", "plq");
        rabbitTemplate.setMessageConverter(new MarshallingMessageConverter(xmlMarshaller));
        logger.warn("Tried more than {} times. Parking rejected message: {} to exchange {} and routing key {}", RETRY_COUNT, payload, parkingExchange, parkingRoutingKey);
        rabbitTemplate.convertAndSend(parkingExchange, parkingRoutingKey, payload);
        // cause the message to be acknowledged and not routed to DLQ
        throw new ImmediateAcknowledgeAmqpException("Give up retrying message: " + payload);
    }
}

DeathCheckHandler handles dead-lettering which is set up on AMQP queues.

How can I park an XML message in incorrect format, i.e. when MarshallingMessageConverter throws UnmarshallingFailureException.

I want to park it in a similar way how I do it in DeathCheckHandler#parkMessage

It should be probably possible with ConditionalRejectingErrorHandler, but I don't know how.

1

There are 1 answers

0
Gary Russell On BEST ANSWER

Clone the ConditionalRejectingErrorHandler.

Use this method as a template...

@Override
public void handleError(Throwable t) {
    log(t);
    if (!this.causeChainContainsARADRE(t) && this.exceptionStrategy.isFatal(t)) {
        if (this.discardFatalsWithXDeath && t instanceof ListenerExecutionFailedException) {
            Message failed = ((ListenerExecutionFailedException) t).getFailedMessage();
            if (failed != null) {
                List<Map<String, ?>> xDeath = failed.getMessageProperties().getXDeathHeader();
                if (xDeath != null && xDeath.size() > 0) {
                    this.logger.error("x-death header detected on a message with a fatal exception; "
                            + "perhaps requeued from a DLQ? - discarding: " + failed);
                    throw new ImmediateAcknowledgeAmqpException("Fatal and x-death present");
                }
            }
        }
        throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", this.rejectManual,
                t);
    }
}

By default, fatal exceptions with an x-death header are discarded via a ImmediateAcknowledgeAmqpException.

It's not easy to subclass and override this method because the fields are private so it would be easiest to just copy this class (and publish to the parking lot before throwing the IAAE).

I will make some improvements to this class to make it easier to customize/override.

Pull Request.