class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap')

1.8k views Asked by At

I got this error when i try to consume records from kinesis: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap') so it looks like the payload is coming in another type or so.

Note: if i replaced batch by record and replaced signature of consumer to receive one message only Consumer<byte[]> fizzBuzzConsumer() it works fine and i can convert the byte array to string with no issues, so why it's not working with batch mode?

spring:
  profiles:
    active: local
  application:
    name: my-consumer
  cloud:
    function:
      definition: fizzBuzzConsumer
    stream:
      function:
        bindings:
          fizzBuzzConsumer-in-0: input
      bindings:
        input:
          consumer:
            batch-mode: true
            use-native-decoding: true
          destination: kinesis-writer-stream
          content-type: text/plain
          group: kinesis-reader-app-group

      kinesis:
        bindings:
          kinesis-writer-stream:
            consumer:
              listener-mode: batch
              checkpoint-mode: periodic
              checkpoint-interval: 3000
              idle-between-polls: ${KINESIS_CONSUMER_IDLE_BETWEEN_POLLS:1000}
              consumer-backoff: ${KINESIS_CONSUMER_BACKOFF:1000}
              records-limit: ${KINESIS_CONSUMER_RECORDS_LIMIT:2000}
              shard-iterator-type: TRIM_HORIZON
              worker-id: kinesis-reader-worker-id

        binder:
          checkpoint:
            table: kinesis-reader-stream-metadata
          locks:
            table: kinesis-reader-lock-registry
            lease-duration: 30
            refresh-period: 3000
            read-capacity: 10
          kpl-kcl-enabled: true
          auto-create-stream: true
          auto-add-shards: true
          min-shard-count: 1

And here is my consumer:

@Bean
public Consumer<Message<List<byte[]>>> fizzBuzzConsumer() {
    return message -> {
        for (byte[] record: message.getPayload()) {
            String json = new String(Objects.requireNonNull(record), StandardCharsets.UTF_8);
            log.info("New Record comes.... {}", json);
        }
    };
}

Error:

2022-11-03 22:22:13.496 ERROR 549022 --- [cTaskExecutor-4] s.i.a.i.k.KclMessageDrivenChannelAdapter : Got an exception during sending a 'GenericMessage [payload=byte[1165], headers={aws_shard=shardId-000000000000, id=d1433639-dcf9-53eb-9980-cc893406c3e8, sourceData=UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49634843198456367976521810433671650607349108380513861634, getData()=java.nio.HeapByteBuffer[pos=0 lim=1165 cap=1165], getPartitionKey()=1082619945], aws_receivedPartitionKey=1082619945, aws_receivedStream=kinesis-writer-stream, aws_receivedSequenceNumber=49634843198456367976521810433671650607349108380513861634, timestamp=1667510514911}]'
for the 'UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49634843198456367976521810433671650607349108380513861634, getData()=java.nio.HeapByteBuffer[pos=0 lim=1165 cap=1165], getPartitionKey()=1082619945]'.
Consider to use 'errorChannel' flow for the compensation logic.

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@4f4bbdbb]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap')
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.23.jar:5.3.23]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.23.jar:5.3.23]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.23.jar:5.3.23]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.23.jar:5.3.23]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216) ~[spring-integration-core-5.5.15.jar:5.5.15]
    at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter.access$1600(KclMessageDrivenChannelAdapter.java:84) ~[spring-integration-aws-2.5.1.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.performSend(KclMessageDrivenChannelAdapter.java:520) ~[spring-integration-aws-2.5.1.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.processSingleRecord(KclMessageDrivenChannelAdapter.java:435) ~[spring-integration-aws-2.5.1.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.processRecords(KclMessageDrivenChannelAdapter.java:418) ~[spring-integration-aws-2.5.1.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorAdapter.processRecords(V1ToV2RecordProcessorAdapter.java:42) ~[amazon-kinesis-client-1.14.8.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:221) ~[amazon-kinesis-client-1.14.8.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176) ~[amazon-kinesis-client-1.14.8.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) ~[amazon-kinesis-client-1.14.8.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) ~[amazon-kinesis-client-1.14.8.jar:na]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap')
2

There are 2 answers

0
Vu Tran On

I face the same problem. But my code is not exactly like yours.

My consumer is of type Consumer<Event<Integer, Product>>. The error is: [B is in module java.base of loader 'bootstrap';se.magnus.api.event.Event is in the unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader.

In my case, I just need to add a no-arguments constructor for the Event class. It will work.

0
Simas Joneliunas On

For me (using Spring boot 3.0.2) the error was, ironically, caused by how the message was sent to the RMQ. On the producer side, I was sending the messages through standard RabbitTemplate and not Spring's StreamBridge.

While I am not entirely familiar with Spring Cloud Steam's message decoding feature, RabbitTemplate in standard configuration is sending a base64 encoded Java object including its package name.

This also persists if RabbitTemplate is set to use JSON, i.e.

var eventPayload = new SomeEventDto(12332, "some string val", "another string val");
template.setMessageConverter(new Jackson2JsonMessageConverter());
    template.convertAndSend("rabbit-stream-exchange", "rabbit-stream-queue", eventPayload); 

Sending the message from the abovementioned code adds additional header to the rabbit message:

__TypeId__: com.example.example123.SomeEventDto

If the Spring Cloud Stream receiver receives the message with __TypeId__ header set it will then fail to cast the message into the correct object, seemingly even if the header is set to the exact type that it is supposed to deserialize into. At the same time, if the same payload is (manually) sent without this header, all other parameters unchanged, the message will be consumed successfully.

Thus, my fix was to send the message through the Spring Cloud Stream instead:

@Service
@RequiredArgsConstructor
public class EventPublisher {
  private final StreamBridge streamBridge;

  public void sendSomeEvent(SomeEventDto event) throws IOException {
    streamBridge.send("some-event-binding-out-0", MessageBuilder.withPayload(event).build());
  }
}

The message sent this way was serialized as JSON and sent without the `__TypeId__` header which was confusing the consumer so much.