When using spring-webflux
with reactor-kafka
receiver how can I manually move/commit offset when RecordDeserializationException
occurs? From RecordDeserializationException
I can get partition and offset, but I cannot manualy create a ReceiverOffset
object which would allow me to commit (as it has private implementation).
reactiveKafkaReceiver
.receiveBatch()
.onErrorResume(e -> {
RecordDeserializationException rde = (RecordDeserializationException) e;
TopicPartition topicPartition = rde.topicPartition();
long offset = rde.offset();
// how can I commit this offset?
return Flux.empty();
})
.delayUntil(flux -> flux
.collectList()
.delayUntil(this::process)
.doOnNext(records -> records.forEach(record -> record.receiverOffset()
.commit()
.subscribeOn(Schedulers.boundedElastic())
.subscribe())))
.retryWhen((Retry.backoff(3, Duration.ofSeconds(2)).transientErrors(true)))
.repeat()
.subscribe();
Is there any solution here?
Following Gary Russell's answer and its link to https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer here is a solution which transforms deserialization errors into null-value (it also automatically adds error info into kafka headers). In such case we should just add null check condition when we process the records in the main receiver stream.