I'm using a spring kafka and I have a question. The Spring Kafka version is 2.6.X.
I set the auto commit setting to false. At this time, the consumer who consumes the message forced an error. As I expected, the commit was not done manually, so the lag should be increased. However, lag did not increase. What do you do with an existing default error handler?
class KafkaConsumerConfig {
@Bean
fun multiTypeConsumerFactory(): ConsumerFactory<String, Any> {
val props = HashMap<String, Any>()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
return DefaultKafkaConsumerFactory(props)
}
@Bean
fun multiTypeKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Any> {
val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
factory.setConsumerFactory(multiTypeConsumerFactory())
return factory
}
}
@Component
@KafkaListener(topics = ["testTopic"], groupId = "group_sdas")
class Consumer {
@KafkaHandler
fun handleFoo(message: Foo) {
println("Received Message in group foo: ${message}")
}
@KafkaHandler
fun handleBar(message: Bar) {
throw RuntimeException("ssstest sss")
}
}
The lag does not increase even if an error occurs because the type id is not included in the header.