Spring cloud stream kafka binder, no messages in DLQ topic for Leyton version

37 views Asked by At

Service details

To construct my example I've used

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.2</version>
        <relativePath/>
    </parent>

and

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>2023.0.0</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>

to build stream I've used

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

My yml configuration is

spring:
  cloud:
    function:
      definition: report
    stream:
      bindings:
        report-in-0:
          destination: ${kafka-conf.consume-topic-report}
          group: ${kafka-conf.group}-report-78107151
          consumer:
            enableDlq: true
            dlqName: ${kafka-conf.report-dlq}
        report-out-0:
          destination: ${kafka-conf.produce-topic-report}
      kafka:
        binder:
          brokers: ${kafka-conf.server}
          schemaRegistryUrl: ${kafka-conf.schema}
        default:
          producer:
            configuration:
              schema.registry.url: ${kafka-conf.schema}
              key.serializer: org.apache.kafka.common.serialization.StringSerializer
              value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
              value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
          consumer:
            configuration:
              schema.registry.url: ${kafka-conf.schema}
              key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
              value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
              value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
              specific.avro.reader: true
        bindings:
          report-in-0:
            consumer:
              configuration:
                value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
                spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
                specific.avro.reader: true

Expectations

If stream gets correct message

  • it should be processed;
  • the processed result should be published to report_topic_redirect;
  • no errors;

If stream gets incorrect message

  • log error;
  • send corrupted message to DLQ topic (in provided configuration it's report_dlq);
  • no blocking for next messages;

Inconsistency of the result with expectations

  • no published messages in the DLQ topic (report_dlq)

Question

It seems I've correctly configured the error handler (because after logging error the corrupted message is not handled recursively blocking further processing), but my DLQ configuration is corrupted, is any ideas what can be wrong here?

P.S.

Possible I need something like a deserializationExceptionHandler: sendToDlq but I didn't detect this property in documentation.

0

There are 0 answers