Why does not lag increase when exception occurs in Spring Kafka Consumer

33 views Asked by At

I'm using a spring kafka and I have a question. The Spring Kafka version is 2.6.X.

I set the auto commit setting to false. At this time, the consumer who consumes the message forced an error. As I expected, the commit was not done manually, so the lag should be increased. However, lag did not increase. What do you do with an existing default error handler?

class KafkaConsumerConfig {
  @Bean
    fun multiTypeConsumerFactory(): ConsumerFactory<String, Any> {
        val props = HashMap<String, Any>()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java
        props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
        return DefaultKafkaConsumerFactory(props)
    }

    @Bean
    fun multiTypeKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Any> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
        factory.setConsumerFactory(multiTypeConsumerFactory())
        return factory
    }



}

@Component
@KafkaListener(topics = ["testTopic"], groupId = "group_sdas")
class Consumer {

    @KafkaHandler
    fun handleFoo(message: Foo) {
        println("Received Message in group foo: ${message}")
    }

    @KafkaHandler
    fun handleBar(message: Bar) {
        throw RuntimeException("ssstest sss")
    }


    

}

The lag does not increase even if an error occurs because the type id is not included in the header.

0

There are 0 answers