Kafka consumer graceful shutdown in spring cloud stream

114 views Asked by At

I can't seem to figure out the proper way to shutdown my Kafka consumers gracefully with spring boot, using the spring cloud stream kafka binder. I have an event handler defined in code, and specified via properties. Under high load, I see about 600 messages consumer per second, but when the terminate signal is sent via kubernetes, the code is interrupted and begins to throw exceptions.

I'm using tomcat due to common code shared with the producers, but this particular consumer has no incoming traffic. It only accepts data coming in through the event driven interface, no manual polling. That loop may be in the middle of writing up to 500 documents to the database, when Spring kills it. The logs look something like this when this happens:

2023-10-18T17:10:22.973Z  INFO 1 --- [ionShutdownHook] s.i.k.i.KafkaMessageDrivenChannelAdapter : stopped org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@147efd9
2023-10-18T17:10:22.975Z  INFO 1 --- [ionShutdownHook] o.s.c.stream.binder.BinderErrorChannel   : Channel 'prod-consumer.kafka-749625255.eventHandler-in-0.errors' has 0 subscriber(s).
2023-10-18T17:10:22.987Z  INFO 1 --- [container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : my_consumer_10: Consumer stopped
2023-10-18T17:10:22.989Z  INFO 1 --- [ionShutdownHook] s.i.k.i.KafkaMessageDrivenChannelAdapter : stopped org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@609b041c
2023-10-18T17:10:22.995Z  INFO 1 --- [container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : my_consumer_10: Consumer stopped
2023-10-18T17:10:22.995Z  INFO 1 --- [ionShutdownHook] s.i.k.i.KafkaMessageDrivenChannelAdapter : stopped org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@75ed125a
2023-10-18T17:10:22.996Z  INFO 1 --- [ionShutdownHook] o.s.b.w.e.tomcat.GracefulShutdown        : Commencing graceful shutdown. Waiting for active requests to complete
2023-10-18T17:10:22.998Z  INFO 1 --- [tomcat-shutdown] o.s.b.w.e.tomcat.GracefulShutdown        : Graceful shutdown complete
2023-10-18T17:10:23.016Z  INFO 1 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-18T17:10:23.016Z  INFO 1 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel    : Channel 'prod-consumer.errorChannel' has 0 subscriber(s).
2023-10-18T17:10:23.017Z  INFO 1 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'
[   197.512368s]  INFO ThreadId(01) outbound: linkerd_app_core::serve: Connection closed error=connection closed before message completed client.addr=10.8.84.196:38956
2023-10-18T17:10:23.021Z ERROR 1 --- [container-0-C-1] com.company.queue.EventConsumer     : caught exception adding item to db
org.apache.http.ConnectionClosedException: Connection closed unexpectedly
2023-10-18 12:10:23.026 
    at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:920) ~[elasticsearch-rest-client-8.5.3.jar!/:8.5.3]

From what I can tell here, the consumers are shut down, as in the application is no longer polling Kafka, but it did not finish completing the current polling loop, being handled in my eventHandler code.

If running locally with logging at DEBUG level, I see these additional logs after the consumers are stopped. Could this be what is killing the event handler while in-progress?

2023-10-18T21:31:14.227-05:00 DEBUG 49906 --- [ionShutdownHook] .n.c.PoolingNHttpClientConnectionManager : Connection manager is shutting down
2023-10-18T21:31:14.228-05:00 DEBUG 49906 --- [ionShutdownHook] h.i.n.c.ManagedNHttpClientConnectionImpl : http-outgoing-0 0:0:0:0:0:0:0:1:57815<->10.20.18.254:443[ACTIVE][r:r][ACTIVE][r][NOT_HANDSHAKING][0][0][0]: Close
2023-10-18T21:31:14.228-05:00 DEBUG 49906 --- [ionShutdownHook] h.i.n.c.ManagedNHttpClientConnectionImpl : http-outgoing-1 0:0:0:0:0:0:0:1:57820<->10.20.18.254:443[ACTIVE][r:r][ACTIVE][r][NOT_HANDSHAKING][0][0][0]: Close
2023-10-18T21:31:14.228-05:00 DEBUG 49906 --- [/O dispatcher 1] o.a.h.i.nio.client.InternalIODispatch    : http-outgoing-0 [CLOSED]: Disconnected
2023-10-18T21:31:14.228-05:00 DEBUG 49906 --- [/O dispatcher 2] o.a.h.i.nio.client.InternalIODispatch    : http-outgoing-1 [CLOSED]: Disconnected
2023-10-18T21:31:14.228-05:00 DEBUG 49906 --- [ionShutdownHook] .n.c.PoolingNHttpClientConnectionManager : Connection manager shut down
2023-10-18T21:31:14.229-05:00 DEBUG 49906 --- [ionShutdownHook] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown
2023-10-18T21:31:14.229-05:00 DEBUG 49906 --- [ionShutdownHook] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans
2023-10-18T21:31:14.230-05:00 DEBUG 49906 --- [ionShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
2023-10-18T21:31:14.231-05:00 DEBUG 49906 --- [ionShutdownHook] s.c.a.AnnotationConfigApplicationContext : Closing kafka_context, started on Wed Oct 18 21:30:54 CDT 2023, parent: org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@3681037

If that is the case, where can I hook into during shutdown to delay the shutdown after the consumers have been stopped, but before the application kills outgoing traffic? I've tried @PreDestroy for the consumer bean, but that is too late. I tried onApplicationEvent(contextClosedEvent: ContextClosedEvent), but that is called before the consumers are closed. I tried adding a Tomcat customizer to hook into the graceful shutdown, but then production runs this in a different priority order from my local env.

My consumer loop looks like this:

    @Bean
    fun eventHandler(): Consumer<List<QueueEvent>>? {
        return Consumer(fun(values: List<QueueEvent>) {
            // Add each item from values to the database
            // Connection closed unexpectedly sometimes here, during shutdown
        }    
    }

and properties look like this:

spring.cloud.function.definition: eventHandler
spring.cloud.stream.bindings.eventHandler-in-0: <consumer_settings here>

I will add that I use linkerd in the container. I originally had a problem of linkerd shutting down too early during termination, but added a 70 second delay on that to fix it. But these errors seem to happen at a point in the app lifecycle where I think they should have already been completed.

0

There are 0 answers