How can I create kafka DLQ for consumer using asp.net core
var kafkaConfig = new ConsumerConfig
{
GroupId = _configuration["ConsumerGroup"],
BootstrapServers = _configuration["KafkaServer"]
};
using (var consumer = new ConsumerBuilder<int, string>(kafkaConfig).Build())
{
consumer.Subscribe(_configuration["Topic"]);
try
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(stoppingToken);
Console.WriteLine($"Consumed message '{consumeResult.Value}' at: '{consumeResult.TopicPartitionOffset}'.");
if (!TryConsume(consumeResult, stoppingToken))
{
await _retryQueueProducer.RetryAsync(consumeResult);
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
finally
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
}
above one is sample code
How to show the failed messages values in DLQ
You know how to use a Producer, and you know how to write a try catch.
That's all you need. Catch exception from a consumed record, and then use a producer to send to a new topic, you now have a DLQ.
Write a new consumer to subscribe to the DLQ.
You should create the actual topic outside of your code.