As far as I know, there exist two configurations related to the heartbeat timeout and max poll interval in Java kafka consumer, which are:
session.timeout.ms
max.poll.interval.ms
If the interval between two consecutive poll() is longer than max.poll.interval.ms, this client will leave group actively.
However, I can only find one similar configuration in Golang sarama, which is Consumer.Group.Session.Timeout, corresponding the session.timeout.ms in Java. It seems there is not a configuration in sarama that has the same effect of max.poll.interval.ms in Java.
So my question is:
Will a sarama client trigger rebalance if this client does not consume messages for a long time(eg. 5min or 30min or longer), while the heartbeat goroutine is still working?
With [email protected], the only effect of long time no consuming I can see is the invalid loop within fetchNewMessages(), which will not send LeaveGroupRequest to brokers.
func (bc *brokerConsumer) subscriptionConsumer() {
for newSubscriptions := range bc.newSubscriptions {
bc.updateSubscriptions(newSubscriptions)
if len(bc.subscriptions) == 0 {
// We're about to be shut down or we're about to receive more subscriptions.
// Take a small nap to avoid burning the CPU.
time.Sleep(partitionConsumersBatchTimeout)
continue
}
response, err := bc.fetchNewMessages()
if err != nil {
Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
bc.abort(err)
return
}
...
The question details are above.
Sarama has
MaxProcessingTime
as a config parameter similar to the Java client'smax.poll.interval.ms
source.
And there is a related issue like this in their repo