I use confluent .net client. Subscriber always reads all messages from Kafka topic after restarting(subscriber service restarting). How to commit offset that consumer already achieved and read from it? Maybe some consumer configuration can help...
Why does consumer read all messages from Kafka topic after restart?
3k views Asked by John AtThere are 2 answers
This is just a wild guess, but how do you declare the consumer's group id? I've seen some examples using this kind of random assignation:
["group.id"] = Guid.NewGuid().ToString(),
If you declare a new/random group.id
each time you launch the consumers, that would lead to the registration of a new consumer group on each execution, which involves auto.offset.reset
kicking in.
If this property is set to "earliest
", then every time you launch your consumers (assuming they have a different group.id each time), they will start from the first avaliable offset, as in your case, reading all the messages again from the beginning.
If this property is set to "latest
", and your producers are currently not sending any messages, you won't be able to read anything, which could create some confusion.
Try setting a fixed group.id
: start consuming, stop the process while messages are still avaliable on the broker, and launch the consumer again, without changing the last group.id
.
This time, as the consumer group is already registered, auto.offset.reset
will be ignored and the starting position would be defined by your commited offsets, which are by default stored on a special topic called __consumer_offsets
.
You have two options:
enable auto commit by setting the consumer properties
EnableAutoCommit = true
(the messages are getting committed after a configurable time, typically 5 seconds), ormanually commit the fetched offsets by
consumer.Commit(consumeResult)
.An example for the manual commits is shown on GitHub.