Spring cloud stream batch producer throw a `SerializationException`

77 views Asked by At

I'm trying to use a batch producer as described in the doc.

I've created the following Supplier:

@Bean
public Supplier<List<Message<String>>> fetch() {
    return () -> List.of(
                MessageBuilder.withPayload("message-1").build(),
        MessageBuilder.withPayload("message-2").build(),
        MessageBuilder.withPayload("message-3").build(),
        MessageBuilder.withPayload("message-4").build());
}

In my understanding, each message should be sent independently. But I've the following exception:

2024-02-24T08:43:58.973+01:00 ERROR 2143477 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@3da51e74], failedMessage=GenericMessage [payload=[GenericMessage [payload=byte[7], headers={contentType=application/json, id=6d584b7d-b699-7fb6-3e8c-9c9363c68b30, timestamp=1708760638972}], GenericMessage [payload=byte[7], headers={contentType=application/json, id=c2515099-eb7c-ee10-a9ad-6a5370ad052c, timestamp=1708760638972}], GenericMessage [payload=byte[7], headers={contentType=application/json, id=836d0275-aceb-09da-1f6d-ee85e724725b, timestamp=1708760638972}], GenericMessage [payload=byte[7], headers={contentType=application/json, id=cbc18cbf-d3b1-47f9-a557-fa3fb7cc6379, timestamp=1708760638972}]], headers={id=50b82042-6d93-a0b2-62b5-d2793520d76a, timestamp=1708760638972}]
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
        at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler.handleMessage(KafkaMessageChannelBinder.java:1567)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1196)
        at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
        at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
        at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:228)
        at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:210)
        at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
        at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
        at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
        at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:206)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:481)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:467)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:419)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:355)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:348)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class java.util.ArrayList to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1003)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1050)
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:799)
        at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:768)
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:577)
        at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:532)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:145)
        at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
        ... 50 more
Caused by: java.lang.ClassCastException: class java.util.ArrayList cannot be cast to class [B (java.util.ArrayList and [B are in module java.base of loader 'bootstrap')
        at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
        at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1000)
        ... 58 more

It seems serialization-related, but I can send

  • List<String>: a unique message with the string list as json
  • Message<String>: a unique message with a single string
  • Message<List<String>: same as the first one

So I'm doubting the serialization in itself is problematic. It must be related to batch producing. Or perhaps their combination ? The project use spring boot 3.2.2 and spring cloud stream 4.1.0.

I'm currently using a workaround, by sending message individualy with StreamBridge . It works fine. But I would like to understand why the functional approach is not working. Am I doing something wrong ? Or can that be a bug in spring cloud stream ?

1

There are 1 answers

3
sobychacko On

If you must use List as the holder type, then you need to use a Function instead of a Supplier. See below. However, I don't think that is your intention, as the function requires input from another destination.

@Bean
public Function<String, List<Message<String>>> batch() {
    return p -> {
        List<Message<String>> list = new ArrayList<>();
        list.add(MessageBuilder.withPayload("message - 1").build());
        list.add(MessageBuilder.withPayload("message - 2").build());
        list.add(MessageBuilder.withPayload("message - 3").build());
        list.add(MessageBuilder.withPayload("message - 4").build());
        return list;
    };
}

In this case, Spring Cloud Stream will send each individual item on the outbound as a single entity.

It will work if you can change your signature to Message for the container type. See below.

@Bean
public Supplier<Message<List<String>>> batch() {
    return () -> {
            List<String> list = new ArrayList<>();
            list.add("message - 1");
            list.add("message - 2");
            list.add("message - 3");
            list.add("message - 4");
            return MessageBuilder.withPayload(list).build();
    };
}

In your case, you are trying to produce a List<Message>>, which is not supported, but if you can switch that to Message<List>, then that works.