I got this error when i try to consume records from kinesis: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap') so it looks like the payload is coming in another type or so.
Note: if i replaced batch by record and replaced signature of consumer to receive one message only Consumer<byte[]> fizzBuzzConsumer() it works fine and i can convert the byte array to string with no issues, so why it's not working with batch mode?
spring:
profiles:
active: local
application:
name: my-consumer
cloud:
function:
definition: fizzBuzzConsumer
stream:
function:
bindings:
fizzBuzzConsumer-in-0: input
bindings:
input:
consumer:
batch-mode: true
use-native-decoding: true
destination: kinesis-writer-stream
content-type: text/plain
group: kinesis-reader-app-group
kinesis:
bindings:
kinesis-writer-stream:
consumer:
listener-mode: batch
checkpoint-mode: periodic
checkpoint-interval: 3000
idle-between-polls: ${KINESIS_CONSUMER_IDLE_BETWEEN_POLLS:1000}
consumer-backoff: ${KINESIS_CONSUMER_BACKOFF:1000}
records-limit: ${KINESIS_CONSUMER_RECORDS_LIMIT:2000}
shard-iterator-type: TRIM_HORIZON
worker-id: kinesis-reader-worker-id
binder:
checkpoint:
table: kinesis-reader-stream-metadata
locks:
table: kinesis-reader-lock-registry
lease-duration: 30
refresh-period: 3000
read-capacity: 10
kpl-kcl-enabled: true
auto-create-stream: true
auto-add-shards: true
min-shard-count: 1
And here is my consumer:
@Bean
public Consumer<Message<List<byte[]>>> fizzBuzzConsumer() {
return message -> {
for (byte[] record: message.getPayload()) {
String json = new String(Objects.requireNonNull(record), StandardCharsets.UTF_8);
log.info("New Record comes.... {}", json);
}
};
}
Error:
2022-11-03 22:22:13.496 ERROR 549022 --- [cTaskExecutor-4] s.i.a.i.k.KclMessageDrivenChannelAdapter : Got an exception during sending a 'GenericMessage [payload=byte[1165], headers={aws_shard=shardId-000000000000, id=d1433639-dcf9-53eb-9980-cc893406c3e8, sourceData=UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49634843198456367976521810433671650607349108380513861634, getData()=java.nio.HeapByteBuffer[pos=0 lim=1165 cap=1165], getPartitionKey()=1082619945], aws_receivedPartitionKey=1082619945, aws_receivedStream=kinesis-writer-stream, aws_receivedSequenceNumber=49634843198456367976521810433671650607349108380513861634, timestamp=1667510514911}]'
for the 'UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49634843198456367976521810433671650607349108380513861634, getData()=java.nio.HeapByteBuffer[pos=0 lim=1165 cap=1165], getPartitionKey()=1082619945]'.
Consider to use 'errorChannel' flow for the compensation logic.
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@4f4bbdbb]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap')
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216) ~[spring-integration-core-5.5.15.jar:5.5.15]
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter.access$1600(KclMessageDrivenChannelAdapter.java:84) ~[spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.performSend(KclMessageDrivenChannelAdapter.java:520) ~[spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.processSingleRecord(KclMessageDrivenChannelAdapter.java:435) ~[spring-integration-aws-2.5.1.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.processRecords(KclMessageDrivenChannelAdapter.java:418) ~[spring-integration-aws-2.5.1.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorAdapter.processRecords(V1ToV2RecordProcessorAdapter.java:42) ~[amazon-kinesis-client-1.14.8.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:221) ~[amazon-kinesis-client-1.14.8.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176) ~[amazon-kinesis-client-1.14.8.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) ~[amazon-kinesis-client-1.14.8.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) ~[amazon-kinesis-client-1.14.8.jar:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.util.List ([B and java.util.List are in module java.base of loader 'bootstrap')
I face the same problem. But my code is not exactly like yours.
My consumer is of type Consumer<Event<Integer, Product>>. The error is:
[B is in module java.base of loader 'bootstrap';se.magnus.api.event.Event is in the unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader.
In my case, I just need to add a no-arguments constructor for the Event class. It will work.