How to move to the next offset when RecordDeserializationException occurs reactor-kafka receiver?

117 views Asked by At

When using spring-webflux with reactor-kafka receiver how can I manually move/commit offset when RecordDeserializationException occurs? From RecordDeserializationException I can get partition and offset, but I cannot manualy create a ReceiverOffset object which would allow me to commit (as it has private implementation).

            .onErrorResume(e -> {
                RecordDeserializationException rde = (RecordDeserializationException) e;
                TopicPartition topicPartition = rde.topicPartition();
                long offset = rde.offset();
                // how can I commit this offset?
                return Flux.empty();
            .delayUntil(flux -> flux
                    .doOnNext(records -> records.forEach(record -> record.receiverOffset()
            .retryWhen((Retry.backoff(3, Duration.ofSeconds(2)).transientErrors(true)))

Is there any solution here?


There are 2 answers

Mikhail Geyer On BEST ANSWER

Following Gary Russell's answer and its link to here is a solution which transforms deserialization errors into null-value (it also automatically adds error info into kafka headers). In such case we should just add null check condition when we process the records in the main receiver stream.

public class KafkaConsumerConfig {
    private final KafkaConsumerProperty KafkaConsumerProperty;
    public KafkaReceiver<String, KafkaMessageDto> reactiveKafkaReceiver() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperty.getBootstrapServers());
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        config.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedDeserializationFunction.class);
        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, KafkaMessageDto.class);

        ReceiverOptions<String, KafkaMessageDto> receiverOptions = ReceiverOptions.<String, KafkaMessageDto>create(config)
       return KafkaReceiver.create(receiverOptions);

public class FailedDeserializationFunction implements Function<FailedDeserializationInfo, Object> {
    public Object apply(FailedDeserializationInfo info) {
        log.warn("Fail to deserialize kafka message from topic=" + info.getTopic() + ". Set it to null. ",
        return null;
Gary Russell On

See this answer for a possible work around:

How to handle deserialization errors in (spring) reactor-kafka

This prevents the failure and allows you to deal with the error in your application and the offset is committed as usual.