I am using Reactor-Kafka 1.2.4 and currently face the issue that on encountering an error while processing the message, the Kafka consumer stops and does not continue to process other messages.
I currently want to just log the error message and continue processing next messages, later on will add DLQ handling,etc.
I have upgraded Reactor-Kafka to 1.3.0, in addition, is the below code good enough?
KafkaReceiver
.receive()
.retry(3)
.subscribeOn(Schedulers.single())
.publishOn(Schedulers.immediate())
.subscribe(this::processRecord);
private void processRecord(ReceiverRecord<String, M> record) {
try {
<Process record>
} catch (Exception e) {
log.error("Error thrown {} while consuming message", e.getMessage());
<Later on Publish to Dead letter Queue>
} finally {
record.receiverOffset().acknowledge();
log.debug("Successfully acknowledged kafka message");
}
}
Thank you.