I have a Spring Cloud Stream project with Kafka and Protobuf, the regular version works fine.
After compile with Graalvm it failed to producer Protobuf messages , the consumer works fine.
My project yaml is :
spring.cloud.stream:
instanceCount: 1
default-binder: kafka
default:
producer.useNativeEncoding: true
consumer.useNativeEncoding: true
kafka:
binder:
auto-create-topics: false
brokers: ${spring.kafka.properties.bootstrap.servers}
consumer-properties:
schema.registry.url: ${spring.kafka.properties.schema.registry.url}
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
specific.protobuf.value.type: proto.dataFusion.VehicleInfo
auto.offset.reset: latest
auto.commit.enable: false
ack.mode: manual_immediate
isolation.level: read_committed
max.poll.records: 150
fetch.min.bytes: 1048576 # 1MB
fetch.max.wait.ms: 1000
producer-properties:
schema.registry.url: ${spring.kafka.properties.schema.registry.url}
value.serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
enable.idempotence: false
I had a simular issue for consumer that was fixed after I set specific.protobuf.value.type property.
If value.serializer of the producer is set to String it works fine in the compiled version, but fail with Protobuf Serializer
But for the producer was no able to fix , I having the error :
2024-01-25T08:50:05.062-03:00 ERROR 8025 --- [sp-sc-cfc] [ main] o.s.cloud.stream.binding.BindingService : Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder checking the topic (cfc_plates_to_check_muniz):
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:684) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionInfoForProducer(KafkaTopicProvisioner.java:600) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:422) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:168) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:310) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:102) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:153) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:353) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:294) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:311) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:315) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:115) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[sp-sc-cfc:4.1.0]
at [email protected]/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:647) ~[na:na]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:59) ~[sp-sc-cfc:4.1.0]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:284) ~[sp-sc-cfc:6.1.1]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:467) ~[sp-sc-cfc:6.1.1]
at [email protected]/java.lang.Iterable.forEach(Iterable.java:75) ~[sp-sc-cfc:na]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:256) ~[sp-sc-cfc:6.1.1]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:201) ~[sp-sc-cfc:6.1.1]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:965) ~[sp-sc-cfc:6.1.1]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:619) ~[sp-sc-cfc:6.1.1]
at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:66) ~[sp-sc-cfc:3.2.0]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:753) ~[sp-sc-cfc:3.2.0]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:455) ~[sp-sc-cfc:3.2.0]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:323) ~[sp-sc-cfc:3.2.0]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1342) ~[sp-sc-cfc:3.2.0]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1331) ~[sp-sc-cfc:3.2.0]
at io.mobi7.sc.cfc.CFCServiceApplication.main(CFCServiceApplication.java:23) ~[sp-sc-cfc:na]
Caused by: java.lang.RuntimeException: Failed to obtain partition information for the topic cfc_plates_to_check_muniz
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$9(KafkaTopicProvisioner.java:657) ~[sp-sc-cfc:4.1.0]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[sp-sc-cfc:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209) ~[sp-sc-cfc:na]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:622) ~[sp-sc-cfc:4.1.0]
... 28 common frames omitted
2024-01-25T08:50:05.064-03:00 ERROR 8025 --- [sp-sc-cfc] [ scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'sp-sc-cfc.platesToCheck-out-0'., failedMessage=GenericMessage [payload=plate: "SIXXXXX"
, headers={sequenceNumber=1, correlationId=63e8b192-568c-c9b1-0867-52deb3eefce4, id=672f42d3-a458-0c59-b2e4-5841806869f2, sequenceSize=0, timestamp=1706183405063}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:228)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:210)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:501)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:356)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:285)
at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:317)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:249)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:151)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:206)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:481)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:467)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:419)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:355)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:348)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
at [email protected]/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at [email protected]/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at [email protected]/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at [email protected]/java.lang.Thread.run(Thread.java:840)
at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:775)
at org.graalvm.nativeimage.builder/com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:203)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=plate: "SIE7D4K"
, headers={sequenceNumber=1, correlationId=63e8b192-568c-c9b1-0867-52deb3eefce4, id=672f42d3-a458-0c59-b2e4-5841806869f2, sequenceSize=0, timestamp=1706183405063}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 63 more
Here is the kafka and protobuf hints using :
hints.reflection().registerType(NullContextNameStrategy.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(NullContextNameStrategy.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(TopicNameStrategy.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(TopicNameStrategy.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(Schema.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(Schema.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(SchemaString.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(SchemaString.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(SchemaReference.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(SchemaReference.class, MemberCategory.INVOKE_DECLARED_METHODS);
//Serializers
hints.reflection().registerType(KafkaProtobufDeserializer.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(KafkaProtobufDeserializer.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(KafkaProtobufSerializer.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(KafkaProtobufSerializer.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
//To create Protobuf
hints.reflection().registerType(DescriptorProtos.FieldOptions.Builder.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.FieldOptions.Builder.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.FieldOptions.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.FieldOptions.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.FieldOptions.newBuilder().getCtype().getClass(), MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.FieldOptions.newBuilder().getCtype().getClass(), MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.FieldOptions.newBuilder().getJstype().getClass(), MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.FieldOptions.newBuilder().getJstype().getClass(), MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.FieldOptions.OptionRetention.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.FieldOptions.OptionRetention.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.FieldOptions.OptionTargetType.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.FieldOptions.OptionTargetType.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.UninterpretedOption.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.UninterpretedOption.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.MessageOptions.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.MessageOptions.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.MessageOptions.Builder.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.MessageOptions.Builder.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.FileOptions.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.FileOptions.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.FileOptions.Builder.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.FileOptions.Builder.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.FileOptions.OptimizeMode.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.FileOptions.OptimizeMode.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.EnumValueOptions.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.EnumValueOptions.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.EnumValueOptions.Builder.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.EnumValueOptions.Builder.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.EnumOptions.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.EnumOptions.class, MemberCategory.INVOKE_DECLARED_METHODS);
hints.reflection().registerType(DescriptorProtos.EnumOptions.Builder.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DescriptorProtos.EnumOptions.Builder.class, MemberCategory.INVOKE_DECLARED_METHODS);
There is some others hits for the business ProtoBuf as well
I tried set a specific.protobuf.value.type to the produce.
I expect producer works in a compiled version like the interpreter one's
This is likely because we are missing a native hint for
io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer.See the hint forJsonSerializerin Spring for Apache Kafka, for example. You can try adding a hint forKafkaProtobufSerializerin your application. If it works, add the hint to Oracle's graalvm-reachability-metadata repository by sending a PR there. Specifically, this is where the hint needs to get added. This way, not all applications don't have to add this custom hint in the applications.Example code in : https://github.com/mmuniz75/scs-producer-graalvm
Update based on debugging the sample app: You are missing a native hint for
io.confluent.kafka.serializers.subject.DefaultReferenceSubjectNameStrategy. You need the following hints:See this for more details:https://github.com/spring-cloud/spring-cloud-stream/issues/2896#issuecomment-1928598548