I'm trying to use a batch producer as described in the doc.
I've created the following Supplier:
@Bean
public Supplier<List<Message<String>>> fetch() {
return () -> List.of(
MessageBuilder.withPayload("message-1").build(),
MessageBuilder.withPayload("message-2").build(),
MessageBuilder.withPayload("message-3").build(),
MessageBuilder.withPayload("message-4").build());
}
In my understanding, each message should be sent independently. But I've the following exception:
2024-02-24T08:43:58.973+01:00 ERROR 2143477 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@3da51e74], failedMessage=GenericMessage [payload=[GenericMessage [payload=byte[7], headers={contentType=application/json, id=6d584b7d-b699-7fb6-3e8c-9c9363c68b30, timestamp=1708760638972}], GenericMessage [payload=byte[7], headers={contentType=application/json, id=c2515099-eb7c-ee10-a9ad-6a5370ad052c, timestamp=1708760638972}], GenericMessage [payload=byte[7], headers={contentType=application/json, id=836d0275-aceb-09da-1f6d-ee85e724725b, timestamp=1708760638972}], GenericMessage [payload=byte[7], headers={contentType=application/json, id=cbc18cbf-d3b1-47f9-a557-fa3fb7cc6379, timestamp=1708760638972}]], headers={id=50b82042-6d93-a0b2-62b5-d2793520d76a, timestamp=1708760638972}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler.handleMessage(KafkaMessageChannelBinder.java:1567)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1196)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:228)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:210)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:206)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:481)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:467)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:419)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:355)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:348)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class java.util.ArrayList to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1003)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1050)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:799)
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:768)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:577)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:532)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:145)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
... 50 more
Caused by: java.lang.ClassCastException: class java.util.ArrayList cannot be cast to class [B (java.util.ArrayList and [B are in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1000)
... 58 more
It seems serialization-related, but I can send
List<String>: a unique message with the string list as jsonMessage<String>: a unique message with a single stringMessage<List<String>: same as the first one
So I'm doubting the serialization in itself is problematic. It must be related to batch producing. Or perhaps their combination ? The project use spring boot 3.2.2 and spring cloud stream 4.1.0.
I'm currently using a workaround, by sending message individualy with StreamBridge . It works fine. But I would like to understand why the functional approach is not working. Am I doing something wrong ? Or can that be a bug in spring cloud stream ?
If you must use
Listas the holder type, then you need to use aFunctioninstead of aSupplier. See below. However, I don't think that is your intention, as the function requires input from another destination.In this case, Spring Cloud Stream will send each individual item on the outbound as a single entity.
It will work if you can change your signature to
Messagefor the container type. See below.In your case, you are trying to produce a
List<Message>>,which is not supported, but if you can switch that toMessage<List>,then that works.