Given I have IntegrationFlow
IntegrationFlows.from(
Amqp.inboundAdapter(rabbitConnectionFactory, QUEUE)
.messageConverter(new MarshallingMessageConverter(xmlMarshaller))
.defaultRequeueRejected(false)
.concurrentConsumers(2)
.maxConcurrentConsumers(4)
.channelTransacted(true)
.errorHandler(new ConditionalRejectingErrorHandler())
)
.log(INFO, AMQP_LOGGER_CATEGORY)
.publishSubscribeChannel(s -> s
.subscribe(f -> f
.handle(deathCheckHandler))
.subscribe(f -> f.handle(service))
)
.get();
where deathCheckHandler
is
@Component
public class DeathCheckHandler {
private static final Logger logger = LoggerFactory.getLogger(lookup().lookupClass());
private static final int RETRY_COUNT = 3;
private final RabbitTemplate rabbitTemplate;
private final Jaxb2Marshaller xmlMarshaller;
public DeathCheckHandler(RabbitTemplate rabbitTemplate, Jaxb2Marshaller xmlMarshaller) {
this.rabbitTemplate = rabbitTemplate;
this.xmlMarshaller = xmlMarshaller;
}
@ServiceActivator
public void check(Message<?> message) {
MessageHeaders headers = message.getHeaders();
Optional<XDeath> rejected = findAnyRejectedXDeathMessageHeader(headers);
if (rejected.isPresent()) {
int rejectedCount = rejected.get().getCount();
logger.debug("Rejected count is {}", rejectedCount);
if (rejectedCount > RETRY_COUNT) {
parkMessage(message);
}
}
}
private void parkMessage(Message<?> message) {
Object payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
String parkingExchange = (String) headers.get("amqp_receivedExchange");
String parkingRoutingKey = ((String) headers.get("amqp_consumerQueue")).replace("queue", "plq");
rabbitTemplate.setMessageConverter(new MarshallingMessageConverter(xmlMarshaller));
logger.warn("Tried more than {} times. Parking rejected message: {} to exchange {} and routing key {}", RETRY_COUNT, payload, parkingExchange, parkingRoutingKey);
rabbitTemplate.convertAndSend(parkingExchange, parkingRoutingKey, payload);
// cause the message to be acknowledged and not routed to DLQ
throw new ImmediateAcknowledgeAmqpException("Give up retrying message: " + payload);
}
}
DeathCheckHandler
handles dead-lettering which is set up on AMQP queues.
How can I park an XML message in incorrect format, i.e. when MarshallingMessageConverter
throws UnmarshallingFailureException
.
I want to park it in a similar way how I do it in DeathCheckHandler#parkMessage
It should be probably possible with ConditionalRejectingErrorHandler
, but I don't know how.
Clone the
ConditionalRejectingErrorHandler
.Use this method as a template...
By default, fatal exceptions with an x-death header are discarded via a
ImmediateAcknowledgeAmqpException
.It's not easy to subclass and override this method because the fields are private so it would be easiest to just copy this class (and publish to the parking lot before throwing the IAAE).
I will make some improvements to this class to make it easier to customize/override.
Pull Request.