Service details
To construct my example I've used
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.2</version>
<relativePath/>
</parent>
and
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2023.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
to build stream I've used
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
My yml configuration is
spring:
cloud:
function:
definition: report
stream:
bindings:
report-in-0:
destination: ${kafka-conf.consume-topic-report}
group: ${kafka-conf.group}-report-78107151
consumer:
enableDlq: true
dlqName: ${kafka-conf.report-dlq}
report-out-0:
destination: ${kafka-conf.produce-topic-report}
kafka:
binder:
brokers: ${kafka-conf.server}
schemaRegistryUrl: ${kafka-conf.schema}
default:
producer:
configuration:
schema.registry.url: ${kafka-conf.schema}
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
consumer:
configuration:
schema.registry.url: ${kafka-conf.schema}
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
specific.avro.reader: true
bindings:
report-in-0:
consumer:
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
Expectations
If stream gets correct message
- it should be processed;
- the processed result should be published to
report_topic_redirect; - no errors;
If stream gets incorrect message
- log error;
- send corrupted message to
DLQtopic (in provided configuration it'sreport_dlq); - no blocking for next messages;
Inconsistency of the result with expectations
- no published messages in the
DLQtopic (report_dlq)
Question
It seems I've correctly configured the error handler (because after logging error the corrupted message is not handled recursively blocking further processing), but my DLQ configuration is corrupted, is any ideas what can be wrong here?
P.S.
Possible I need something like a deserializationExceptionHandler: sendToDlq but I didn't detect this property in documentation.