When following a Consume-Process-Produce pattern for a transactional flow in Spring Boot (2.7.11) Application.
We are getting the following error occasionally, when trying to produce message:
Caused by: java.lang.IllegalStateException: Cannot perform operation after producer has been closed
at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:921)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:930)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1087)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:429)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:513)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
... 71 common frames omitted
2024-02-06 15:29:23.774 ERROR [herald-fulfillment-service,,] 1 --- [container-0-C-1] o.s.k.core.DefaultKafkaProducerFactory : Abort failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@796dd26e]
java.lang.IllegalStateException: Cannot perform operation after producer has been closed
at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:921)
at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:785)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.abortTransaction(DefaultKafkaProducerFactory.java:1174)
at org.springframework.kafka.core.KafkaResourceHolder.rollback(KafkaResourceHolder.java:66)
at org.springframework.kafka.transaction.KafkaTransactionManager.doRollback(KafkaTransactionManager.java:194)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processRollback(AbstractPlatformTransactionManager.java:835)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.rollback(AbstractPlatformTransactionManager.java:809)
at org.springframework.transaction.support.TransactionTemplate.rollbackOnException(TransactionTemplate.java:168)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:144)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2390)
We have set the transaction id for Kafka using the following property
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix
We have looked into this bug which was closed: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/626 (Which mentions there was an issue where single-shared producer object was being destroyed, causing similar issue) Which seems related to the issue we are facing.
We are currently using these versions:
'org.springframework.boot:2.7.11'
'org.springframework.cloud:spring-cloud-stream:3.2.10'
'org.springframework.cloud:spring-cloud-stream-binder-kafka:3.2.10'
But we still get the same issue from occasionally when we are trying to produce messages in that transactional scope.
We have the following flow:
- Consume messages from Kafka Topic using a Consumer Bean configured in the Stream Cloud Binders as Consumer.
- Now the processing of this message is done under @Transactional scope.
- In the processing we have few JPA Operations of Database update.
- Then we publish few updates to other systems using StreamBridge.
Now the issue is this flow works fine most of the time, but occasionally we get the mentioned issue .