In my .net C# project (with Confluent Kafka library) currently I am using following code to read the newest message from a Kafka topic. But with this code I can read the newest message from the defined partition. But the Kafka server is writing the values of my topic each time to a different partition(My Kafka topic is configured for Partition 0,1,2). So the last (newest) message in the partition is not always the newest message that has been sent to Kafka from the data source side.

How can I adapt my code for three partitions? Is there a simple function for that in Kafka Confluent? Or do I have to read each time from all partitions the message with Offset.End,check their timestamp, and decide which one is the newest one?

        CancellationTokenSource source = new CancellationTokenSource();
        CancellationToken cancellationToken = source.Token;
        using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
        {
            consumer.Subscribe("My_Topic");               

            while (var_true)
            {
                TopicPartitionOffset tps = new TopicPartitionOffset(new TopicPartition("My_Topic", 1),Offset.End);
                consumer.Assign(tps);
                var consumeResult = consumer.Consume(cancellationToken);                      
                
                Kafka_message_total = consumeResult.Message.Value;

                // additional code to send the message value to an application

                System.Threading.Thread.Sleep(2000);

            }

            consumer.Close();
        }
1

There are 1 answers

0
OneCricketeer On BEST ANSWER

When you disable consumer group commits and you set AutoOffsetReset=latest, it'll always start reading from the end of the topic, for all partitions.

do I have to read each time from all partitions the message with Offset.End,check their timestamp, and decide which one is the newest one?

Yes. You can use Offset.End, or seek the consumer to the end of the topic, at any time, or calculate the end offset and subtract one, then seek there.

You simply need a loop of TopicPartition objects for every partition to assign. However, this doesn't parallelize as well as consumer groups using the subscribe api

see that the message is sometimes coming to P-0, sometimes to P-1 and sometimes to P-3. I don't know how kafka is deciding when to write the incoming message to which partition

The Kafka documentation explains how the producer will partition record keys, based on hash algorithms, or round robined for null keys