In our application we are moving from spring-amqp to reactor-rabbitmq to work well with reactive nature of the application. We have been reading the official guide from project reactor. However I am not quite sure, how to send the messages to dead-letter queues once the retries are exhausted.
In the previous implementation we are throwing Spring-AMPQ provided AmqpRejectAndDontRequeueException, which causes the messages to go to dead-letter queue automatically, without using a dedicated publisher to the dead letter queue. How to do similar in reactor-rabbitmq? Do I need to write a dedicated publisher which gets called from the listeners after the retries are exhausted or there are other ways to handle it. Also, is there an official project reactor documentation for DLQs and parking queues.
Here is some code samples for both versions:
AMQP Version:
@AllArgsConstructor
public class SampleListener {
private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
private final MessageListenerContainerFactory messageListenerContainerFactory;
private final Jackson2JsonMessageConverter converter;
@PostConstruct
public void subscribe() {
var mlc = messageListenerContainerFactory
.createMessageListenerContainer(SAMPLE_QUEUE);
MessageListener messageListener = message -> {
try {
TraceableMessage traceableMessage = (TraceableMessage) converter.fromMessage(message);
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
MyModel myModel = mapper.convertValue(traceableMessage.getMessage(), MyModel.class);
MDC.put(CORRELATION_ID, traceableMessage.getCorrelationId());
logger.info("Received message for id : {}", myModel.getId());
processMessage(myModel)
.subscriberContext(ctx -> {
Optional<String> correlationId = Optional.ofNullable(MDC.get(CORRELATION_ID));
return correlationId.map(id -> ctx.put(CORRELATION_ID, id))
.orElseGet(() -> ctx.put(CORRELATION_ID, UUID.randomUUID().toString()));
}).block();
MDC.clear();
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new AmqpRejectAndDontRequeueException(e.getMessage(), e);
}
};
mlc.setupMessageListener(messageListener);
mlc.start();
}
processMessage is doing the business logic, and in case if it fails I want to move it to DLQ. Works fine in case of AMQP.
Reactor RabbitMQ version:
@AllArgsConstructor
public class SampleListener {
private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
private final MessageListenerContainerFactory messageListenerContainerFactory;
private final Jackson2JsonMessageConverter converter;
@PostConstruct
public void subscribe() {
receiver.consumeAutoAck(SAMPLE_QUEUE)
.subscribe(delivery -> {
TraceableMessage traceableMessage = Serializer.to(delivery.getBody(), TraceableMessage.class);
Mono.just(traceableMessage)
.map(this::extractMyModel)
.doOnNext(myModel -> logger.info("Received message for id : {}", myModel.getId()))
.flatMap(this::processMessage)
.doFinally(signalType -> MDC.clear())
.retryWhen(Retry
.fixedDelay(1, Duration.ofMillis(10000))
.onRetryExhaustedThrow() //Move to DLQ
.doAfterRetry(retrySignal -> {
if ((retrySignal.totalRetries() + 1) >= 1) {
logger.info("Exhausted retries");
//Move to DLQ
}
}))
.subscriberContext(ctx -> ctx.put(CORRELATION_ID, traceableMessage.getCorrelationId()))
.subscribe();
}
);
}
One of those two places with comment//Move to DLQ, would be I am guessing where the message should go to DLQ. That's where I decide I cannot process this anymore. Should there be a different publisher pushing to DLQ or any specific setting can take care of it automatically.
Please let me know.
I got the answer to this problem. I was using async listeners and using the
consumeAutoAck. When I switched toconsumeManualAckI get aAcknowledgebleDeliverywhere I can do anack(false)which is supposed to move it to dead letter queue.