How can I reset the topic offset while consuming from Kafka topic?

20 views Asked by At

I start to consume from the last message of a Kafka topic with following code. (Confluent Kafka). What I want to do is:

  1. I want to consume the last 50 messages of a Kafka topic
  2. wait for 5 seconds and consume again the last 50 messages (The data in between for 5 seconds shall be discarded)
  3. This should be done cyclic in the while loop.

I have following code: Currently when consume 50 messages, wait for 5 seconds and proceed with consuming, the consuming will be proceeded where it was paused. I have marked the line below where I should be able to reset the offset to the last message. How can I do that?

    public async Task Read_Status_from_Kafka_test2()
    {
        Int32 counter = 0;
        config = new ConsumerConfig()
        {
            BootstrapServers = "servers"
            GroupId = "foo3",
            AutoOffsetReset = AutoOffsetReset.Latest,
            EnableAutoCommit = false,
        };
        using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
        {
            c.Subscribe("myTopic");
            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        counter += 1;
                        Console.WriteLine($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}'.");
                        if (counter>50)
                        {
                            counter = 0;
                            // here I want to set the offset again to the last message !!!!!!!!!                        
                            Thread.Sleep(5000);
                        }
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }

    }
0

There are 0 answers