Reactor - Kafka - Consumer stops on error while processing message

664 views Asked by At

I am using Reactor-Kafka 1.2.4 and currently face the issue that on encountering an error while processing the message, the Kafka consumer stops and does not continue to process other messages.

I currently want to just log the error message and continue processing next messages, later on will add DLQ handling,etc.

I have upgraded Reactor-Kafka to 1.3.0, in addition, is the below code good enough?

 KafkaReceiver
              .receive()
              .retry(3)
              .subscribeOn(Schedulers.single())
              .publishOn(Schedulers.immediate())
              .subscribe(this::processRecord);
              
 private void processRecord(ReceiverRecord<String, M> record) {
      try {
        <Process record>
      } catch (Exception e) {
        log.error("Error thrown {} while consuming message", e.getMessage());
        <Later on Publish to Dead letter Queue>
      } finally {
        record.receiverOffset().acknowledge();
        log.debug("Successfully acknowledged kafka message");
      }
  } 

Thank you.

0

There are 0 answers