How to implement Kafka DLQ in asp.net

263 views Asked by At

How can I create kafka DLQ for consumer using asp.net core

            var kafkaConfig = new ConsumerConfig
            { 
                GroupId = _configuration["ConsumerGroup"],
                BootstrapServers = _configuration["KafkaServer"]
            };

            using (var consumer = new ConsumerBuilder<int, string>(kafkaConfig).Build())
            {
                consumer.Subscribe(_configuration["Topic"]);

                try
                {
                    while (!stoppingToken.IsCancellationRequested)
                    {
                        try 
                        {
                            var consumeResult = consumer.Consume(stoppingToken);
                            Console.WriteLine($"Consumed message '{consumeResult.Value}' at: '{consumeResult.TopicPartitionOffset}'.");

                            if (!TryConsume(consumeResult, stoppingToken)) 
                            {
                                await _retryQueueProducer.RetryAsync(consumeResult);
                            }
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Error occured: {e.Error.Reason}");
                        }
                    }
                }
                finally
                {
                    // Ensure the consumer leaves the group cleanly and final offsets are committed.
                    consumer.Close();
                }
            }

above one is sample code

How to show the failed messages values in DLQ

1

There are 1 answers

4
OneCricketeer On

You know how to use a Producer, and you know how to write a try catch.

That's all you need. Catch exception from a consumed record, and then use a producer to send to a new topic, you now have a DLQ.

Write a new consumer to subscribe to the DLQ.

You should create the actual topic outside of your code.