Spring Cloud Stream Binder Kafka: Cannot perform operation after producer has been closed

58 views Asked by At

When following a Consume-Process-Produce pattern for a transactional flow in Spring Boot (2.7.11) Application.

We are getting the following error occasionally, when trying to produce message:

Caused by: java.lang.IllegalStateException: Cannot perform operation after producer has been closed
        at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:921)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:930)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1087)
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655)
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:429)
        at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:513)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
        ... 71 common frames omitted
 
2024-02-06 15:29:23.774 ERROR [herald-fulfillment-service,,] 1 --- [container-0-C-1] o.s.k.core.DefaultKafkaProducerFactory   : Abort failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@796dd26e]
 
java.lang.IllegalStateException: Cannot perform operation after producer has been closed
        at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:921)
        at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:785)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.abortTransaction(DefaultKafkaProducerFactory.java:1174)
        at org.springframework.kafka.core.KafkaResourceHolder.rollback(KafkaResourceHolder.java:66)
        at org.springframework.kafka.transaction.KafkaTransactionManager.doRollback(KafkaTransactionManager.java:194)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.processRollback(AbstractPlatformTransactionManager.java:835)
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.rollback(AbstractPlatformTransactionManager.java:809)
        at org.springframework.transaction.support.TransactionTemplate.rollbackOnException(TransactionTemplate.java:168)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:144)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeInTransaction(KafkaMessageListenerContainer.java:2390)

We have set the transaction id for Kafka using the following property spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix

We have looked into this bug which was closed: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/626 (Which mentions there was an issue where single-shared producer object was being destroyed, causing similar issue) Which seems related to the issue we are facing.

We are currently using these versions:

'org.springframework.boot:2.7.11'
'org.springframework.cloud:spring-cloud-stream:3.2.10'
'org.springframework.cloud:spring-cloud-stream-binder-kafka:3.2.10'

But we still get the same issue from occasionally when we are trying to produce messages in that transactional scope.

We have the following flow:

  1. Consume messages from Kafka Topic using a Consumer Bean configured in the Stream Cloud Binders as Consumer.
  2. Now the processing of this message is done under @Transactional scope.
  3. In the processing we have few JPA Operations of Database update.
  4. Then we publish few updates to other systems using StreamBridge.

Now the issue is this flow works fine most of the time, but occasionally we get the mentioned issue .

0

There are 0 answers