prevent sending message to unknown kafka topic when topic is defined programatically in Quarkus smallrye

38 views Asked by At

I'm building a Quarkus service for being a middleware in charge of sending kafka events exposing a POST endpoint. As many other services use it, I have declared only one outgoing channel where topic is assigned programatically as below emitter

I want to apply any defensive mechanism to prevent pod keeps trying before get exhausted when topic name is wrong (currently pod tries like 6kish)

I tried different approaches setting up properties in application.yaml although none of them worked (retries, failure-strategy, health-topic-verification-enabled and so), as shown on below yaml snippet

Let me upload 2 screenshots , I have had to remove all matches for UNKNOWN_TOPIC_OR_PARTITION} on screenshot1 because otherwise I was not able to show you the whole flow

enter image description here

between above lines and even after than last line testing NACK there are 6k logs like below

enter image description here

yaml snippet

mp:
  messaging:
    outgoing:
      kafka-metrics:
        value:
          serializer: io.quarkus.kafka.client.serialization.ObjectMapperSerializer
        connector: smallrye-kafka
        merge: true

"%prod":
  quarkus:
    quartz:
      # Ensure tasks only run once even with multiple instances of the application running
      clustered: true
  mp:
    messaging:
      outgoing:
        kafka-metrics:
          retries: 5
          health-enabled: true
          health-topic-verification-enabled: true
          health-readiness-enabled: true
          failure-strategy: ignore
          health-readiness-topic-verification: true
          bootstrap:
            servers: ${DC_KAFKA_BOOTSTRAP_SERVERS}
          security:
            protocol: SSL
          ssl:
            keystore:
              location: ${DC_KAFKA_KEYSTORE_LOCATION}
              password: ${DC_KAFKA_KEYSTORE_PASSWORD}
            truststore:
              location: ${DC_KAFKA_TRUSTSTORE_LOCATION}

Emitter

    @ApplicationScoped
    @Slf4j
    public class KafkaMetricsProducer implements MetricsProducer {
    private final Emitter<JsonNode> metricsMessageEmitter;

    public KafkaMetricsProducer(@Channel("kafka-metrics") Emitter<JsonNode> 
  metricsMessageEmitter) {
    this.metricsMessageEmitter = metricsMessageEmitter;
  }

  @Override
  @OnOverflow(OnOverflow.Strategy.DROP)
  public void send(GenericMetricEvent metric) {

    log.info(
        "Sending {} of schema version {} with key {} to {}",
        metric.getEventType(),
        metric.getSchemaVersion(),
        metric.getKey(),
        metric.getTopic());
    metricsMessageEmitter.send(
        Message.of(
                metric.getPayload(),
                () -> {
                  log.info("testing ACK logging");
                  return CompletableFuture.completedFuture(null);
                },
                reason -> {
                  log.info(
                      "testing NACK logging with failure reason cause {} , reason msg {}",
                      reason.getCause(),
                      reason.getMessage(),
                      reason.getStackTrace());

                  return CompletableFuture.completedFuture(null);
                })
            .addMetadata(
                OutgoingKafkaRecordMetadata.<String>builder()
                    .withTopic(metric.getTopic())
                    .withKey(metric.getKey())
                    .withTimestamp(Instant.now())
                    .withHeaders(
                        new RecordHeaders()
                            .add("schemaVersion", metric.getSchemaVersion().getBytes(UTF_8))
                            .add("messageId", MDC.get(DC_CORRELATION_ID_HEADER).getBytes(UTF_8))
                            .add("messageType", metric.getEventType().getBytes(UTF_8))
                            .add(
                                "eventTimestamp",
                                metric.getEventTimestamp().toString().getBytes(UTF_8)))
                    .build()));
    log.info(
        "Sent {} of schema version {} with key {} to {}",
        metric.getEventType(),
        metric.getSchemaVersion(),
        metric.getKey(),
        metric.getTopic());
  }
0

There are 0 answers