How to move to the next offset when RecordDeserializationException occurs reactor-kafka receiver?

95 views Asked by At

When using spring-webflux with reactor-kafka receiver how can I manually move/commit offset when RecordDeserializationException occurs? From RecordDeserializationException I can get partition and offset, but I cannot manualy create a ReceiverOffset object which would allow me to commit (as it has private implementation).

   reactiveKafkaReceiver
            .receiveBatch()
            .onErrorResume(e -> {
                RecordDeserializationException rde = (RecordDeserializationException) e;
                TopicPartition topicPartition = rde.topicPartition();
                long offset = rde.offset();
                // how can I commit this offset?
                return Flux.empty();
            })
            .delayUntil(flux -> flux
                    .collectList()
                    .delayUntil(this::process)
                    .doOnNext(records -> records.forEach(record -> record.receiverOffset()
                            .commit()
                            .subscribeOn(Schedulers.boundedElastic())
                            .subscribe())))
            .retryWhen((Retry.backoff(3, Duration.ofSeconds(2)).transientErrors(true)))
            .repeat()
            .subscribe();

Is there any solution here?

2

There are 2 answers

0
Mikhail Geyer On BEST ANSWER

Following Gary Russell's answer and its link to https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer here is a solution which transforms deserialization errors into null-value (it also automatically adds error info into kafka headers). In such case we should just add null check condition when we process the records in the main receiver stream.

@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {
    private final KafkaConsumerProperty KafkaConsumerProperty;
    
    @Bean
    public KafkaReceiver<String, KafkaMessageDto> reactiveKafkaReceiver() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperty.getBootstrapServers());
        ...
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        config.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedDeserializationFunction.class);
        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, KafkaMessageDto.class);

        ReceiverOptions<String, KafkaMessageDto> receiverOptions = ReceiverOptions.<String, KafkaMessageDto>create(config)
              .subscription(Collections.singletonList(kafkaConsumerProperty.getTopicName()));
       return KafkaReceiver.create(receiverOptions);
   }
}


@Slf4j
public class FailedDeserializationFunction implements Function<FailedDeserializationInfo, Object> {
    @Override
    public Object apply(FailedDeserializationInfo info) {
        log.warn("Fail to deserialize kafka message from topic=" + info.getTopic() + ". Set it to null. ",
                info.getException());
        return null;
    }
}
0
Gary Russell On

See this answer for a possible work around:

How to handle deserialization errors in (spring) reactor-kafka

This prevents the failure and allows you to deal with the error in your application and the offset is committed as usual.