I am using spring cloud stream and function with Kafka binder. In case of error records, Records will be published to DLQ. The number of partitions of the original and DLQ topic are not same. So, in this case, a bean of type DlaPartitonFunction to be defined. (Below is use to default key parititoner of kafka)

 @Bean
    public DlqPartitionFunction partitionFunction() {
        return (group, record, ex) -> null;
    }

Eventhough we provide this bean, This is bean is not referenced/injected in KafkaMessageChannelBinder bean

KafkaMessageChannelBinder kafkaMessageChannelBinder(
            KafkaBinderConfigurationProperties configurationProperties,
            KafkaTopicProvisioner provisioningProvider,
            @Nullable ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> listenerContainerCustomizer,
            @Nullable MessageSourceCustomizer<KafkaMessageSource<?, ?>> sourceCustomizer,
            @Nullable ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<?, ?>> messageHandlerCustomizer,
            @Nullable ConsumerEndpointCustomizer<KafkaMessageDrivenChannelAdapter<?, ?>> consumerCustomizer,
            ObjectProvider<KafkaBindingRebalanceListener> rebalanceListener,
            ObjectProvider<DlqPartitionFunction> dlqPartitionFunction, // Not referenced
            ObjectProvider<DlqDestinationResolver> dlqDestinationResolver,
            ObjectProvider<ClientFactoryCustomizer> clientFactoryCustomizer,
            ObjectProvider<ConsumerConfigCustomizer> consumerConfigCustomizer,
            ObjectProvider<ProducerConfigCustomizer> producerConfigCustomizer,
            ProducerListener producerListener, KafkaExtendedBindingProperties kafkaExtendedBindingProperties
            ) {}

Am i missing anything apart from defining a bean?

0

There are 0 answers