I am new to Spring-Kafka and trying to implement retry in case of failure or any exception during kafka message processing using Spring Kafka RetryTemplate.
I have used the following code:
//This is KafkaListenerContainerFactory:
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryRetry() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(retryContext -> {
ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
logger.info("Recovery is called for message {} ", consumerRecord.value());
return Optional.empty();
});
return factory;
}
// Retry template
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
// Todo: take from config
fixedBackOffPolicy.setBackOffPeriod(240000);// 240seconds
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
// Todo: take from config
simpleRetryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(simpleRetryPolicy);
return retryTemplate;
}
//
This is consumerFactory
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
When any exception occurs, it is getting retried as expected as per the retry policy. Once, max retries exhaust, it calls the recovery callback method. But soon after that, it gives "java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available" with some detail like: Failing OffsetCommit request since the consumer is not part of an active group.
It seems that it is not able to commit the offset as the consumer is now kicked off from group because it was idle for long time (backoffperiod*(maxretry-1)) before next poll.
Do I need to add max.poll.interval.ms with some large value?
Is there any other way to achieve this so that this commit failed error won't come even if the consumer is taking so much time in processing and is scheduled to retry with long interval.
Please help me on this.
The aggregate backOff delay must be less than the
max.poll.interval.ms
to avoid a rebalance.It is now preferred to use a
SeekToCurrentErrorHandler
instead of aRetryTemplate
because then only each delay (instead of the aggregate) needs to be less thanmax.poll.interval.ms
Documentation here.