When I use kafka-producer-perf-test.sh to simulate the producer sending 10,000 messages to a topic 't' with 2 partitions, and use the consumer code mentioned below with dfv1.CommitN
set to 3, I observed that when consuming message number 9,748, the sess.Commit()
operation becomes significantly slow, taking around 500-1,004 milliseconds. However, when consuming messages from number 9,747 onwards, the sess.Commit()
operation takes only around 1-3 milliseconds.
func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
i := 0
for m := range claim.Messages() {
msg := m.Value
if err := h.f(context.Background(), msg); err != nil {
// noop
} else {
sess.MarkMessage(m, "")
i++
if i%dfv1.CommitN == 0 {
t1 := time.Now()
sess.Commit()
t2 := time.Now()
fmt.Printf("consume cost: %d ms\n", t2.Sub(t1).milliSeconds)
}
}
}
return nil
}
Why the consume becomes slower after consuming the message number 9747?
Hope the cost time around 1-3ms for each sess.Commit()