Will sarama client trigger rebalance if this client does not fetch messages for a long time?

445 views Asked by At

As far as I know, there exist two configurations related to the heartbeat timeout and max poll interval in Java kafka consumer, which are:

session.timeout.ms

max.poll.interval.ms

If the interval between two consecutive poll() is longer than max.poll.interval.ms, this client will leave group actively.

However, I can only find one similar configuration in Golang sarama, which is Consumer.Group.Session.Timeout, corresponding the session.timeout.ms in Java. It seems there is not a configuration in sarama that has the same effect of max.poll.interval.ms in Java.

So my question is:

Will a sarama client trigger rebalance if this client does not consume messages for a long time(eg. 5min or 30min or longer), while the heartbeat goroutine is still working?

With [email protected], the only effect of long time no consuming I can see is the invalid loop within fetchNewMessages(), which will not send LeaveGroupRequest to brokers.

func (bc *brokerConsumer) subscriptionConsumer() {
    for newSubscriptions := range bc.newSubscriptions {
        bc.updateSubscriptions(newSubscriptions)

        if len(bc.subscriptions) == 0 {
            // We're about to be shut down or we're about to receive more subscriptions.
            // Take a small nap to avoid burning the CPU.
            time.Sleep(partitionConsumersBatchTimeout)
            continue
        }

        response, err := bc.fetchNewMessages()
        if err != nil {
            Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
            bc.abort(err)
            return
        }
...

The question details are above.

1

There are 1 answers

1
Rahmat Fathoni On

Sarama has MaxProcessingTime as a config parameter similar to the Java client's max.poll.interval.ms

        // The maximum amount of time the consumer expects a message takes to
        // process for the user. If writing to the Messages channel takes longer
        // than this, that partition will stop fetching more messages until it
        // can proceed again.
        // Note that, since the Messages channel is buffered, the actual grace time is
        // (MaxProcessingTime * ChannelBufferSize). Defaults to 100ms.
        // If a message is not written to the Messages channel between two ticks
        // of the expiryTicker then a timeout is detected.
        // Using a ticker instead of a timer to detect timeouts should typically
        // result in many fewer calls to Timer functions which may result in a
        // significant performance improvement if many messages are being sent
        // and timeouts are infrequent.
        // The disadvantage of using a ticker instead of a timer is that
        // timeouts will be less accurate. That is, the effective timeout could
        // be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
        // example, if `MaxProcessingTime` is 100ms then a delay of 180ms
        // between two messages being sent may not be recognized as a timeout.

source.

And there is a related issue like this in their repo