I'm building a Quarkus service for being a middleware in charge of sending kafka events exposing a POST endpoint. As many other services use it, I have declared only one outgoing channel where topic is assigned programatically as below emitter
I want to apply any defensive mechanism to prevent pod keeps trying before get exhausted when topic name is wrong (currently pod tries like 6kish)
I tried different approaches setting up properties in application.yaml although none of them worked (retries, failure-strategy, health-topic-verification-enabled and so), as shown on below yaml snippet
Let me upload 2 screenshots , I have had to remove all matches for UNKNOWN_TOPIC_OR_PARTITION} on screenshot1 because otherwise I was not able to show you the whole flow
between above lines and even after than last line testing NACK there are 6k logs like below
yaml snippet
mp:
messaging:
outgoing:
kafka-metrics:
value:
serializer: io.quarkus.kafka.client.serialization.ObjectMapperSerializer
connector: smallrye-kafka
merge: true
"%prod":
quarkus:
quartz:
# Ensure tasks only run once even with multiple instances of the application running
clustered: true
mp:
messaging:
outgoing:
kafka-metrics:
retries: 5
health-enabled: true
health-topic-verification-enabled: true
health-readiness-enabled: true
failure-strategy: ignore
health-readiness-topic-verification: true
bootstrap:
servers: ${DC_KAFKA_BOOTSTRAP_SERVERS}
security:
protocol: SSL
ssl:
keystore:
location: ${DC_KAFKA_KEYSTORE_LOCATION}
password: ${DC_KAFKA_KEYSTORE_PASSWORD}
truststore:
location: ${DC_KAFKA_TRUSTSTORE_LOCATION}
Emitter
@ApplicationScoped
@Slf4j
public class KafkaMetricsProducer implements MetricsProducer {
private final Emitter<JsonNode> metricsMessageEmitter;
public KafkaMetricsProducer(@Channel("kafka-metrics") Emitter<JsonNode>
metricsMessageEmitter) {
this.metricsMessageEmitter = metricsMessageEmitter;
}
@Override
@OnOverflow(OnOverflow.Strategy.DROP)
public void send(GenericMetricEvent metric) {
log.info(
"Sending {} of schema version {} with key {} to {}",
metric.getEventType(),
metric.getSchemaVersion(),
metric.getKey(),
metric.getTopic());
metricsMessageEmitter.send(
Message.of(
metric.getPayload(),
() -> {
log.info("testing ACK logging");
return CompletableFuture.completedFuture(null);
},
reason -> {
log.info(
"testing NACK logging with failure reason cause {} , reason msg {}",
reason.getCause(),
reason.getMessage(),
reason.getStackTrace());
return CompletableFuture.completedFuture(null);
})
.addMetadata(
OutgoingKafkaRecordMetadata.<String>builder()
.withTopic(metric.getTopic())
.withKey(metric.getKey())
.withTimestamp(Instant.now())
.withHeaders(
new RecordHeaders()
.add("schemaVersion", metric.getSchemaVersion().getBytes(UTF_8))
.add("messageId", MDC.get(DC_CORRELATION_ID_HEADER).getBytes(UTF_8))
.add("messageType", metric.getEventType().getBytes(UTF_8))
.add(
"eventTimestamp",
metric.getEventTimestamp().toString().getBytes(UTF_8)))
.build()));
log.info(
"Sent {} of schema version {} with key {} to {}",
metric.getEventType(),
metric.getSchemaVersion(),
metric.getKey(),
metric.getTopic());
}