I am new to Spring-Kafka and trying to implement retry in case of failure or any exception during kafka message processing using Spring Kafka RetryTemplate.

I have used the following code:

//This is KafkaListenerContainerFactory:

public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryRetry() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRetryTemplate(retryTemplate());
    factory.setRecoveryCallback(retryContext -> {
        ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
        logger.info("Recovery is called for message {} ", consumerRecord.value());
        return Optional.empty();
    });
    return factory;
}

// Retry template

public RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    // Todo: take from config
    fixedBackOffPolicy.setBackOffPeriod(240000);// 240seconds
    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
    SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
    // Todo: take from config
    simpleRetryPolicy.setMaxAttempts(3);
    retryTemplate.setRetryPolicy(simpleRetryPolicy);
    return retryTemplate;
}
    
//

This is consumerFactory

public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

When any exception occurs, it is getting retried as expected as per the retry policy. Once, max retries exhaust, it calls the recovery callback method. But soon after that, it gives "java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available" with some detail like: Failing OffsetCommit request since the consumer is not part of an active group.

It seems that it is not able to commit the offset as the consumer is now kicked off from group because it was idle for long time (backoffperiod*(maxretry-1)) before next poll.

Do I need to add max.poll.interval.ms with some large value?

Is there any other way to achieve this so that this commit failed error won't come even if the consumer is taking so much time in processing and is scheduled to retry with long interval.

Please help me on this.

1

There are 1 answers

3
Gary Russell On BEST ANSWER

The aggregate backOff delay must be less than the max.poll.interval.ms to avoid a rebalance.

It is now preferred to use a SeekToCurrentErrorHandler instead of a RetryTemplate because then only each delay (instead of the aggregate) needs to be less than max.poll.interval.ms

Documentation here.