Add new topics to sarama.ConsumerGroup

57 views Asked by At

I'm new to the Sarama library, so I apologize in advance for any basic questions.

I'm currently working with sarama.ConsumerGroup and have implemented my own ConsumerGroupHandler. The primary goal is to consume messages from newly added topics in Kafka when the consumerGroup.Consume().

Upon invoking the function below - AddTopics,according to Sarama debugger the new topics are added. Although the rebalancing process appears to conclude in the Setup function, an unexpected behavior arises - the ConsumeClaim method is not invoked. And the most interesting thing is, messages are consumed from the second event only.

What am I doing wrong? What are steps to add the new topics to running Consumer Group? - Is that doable ?

Thank you in advance to any input.

Here the AddTopics function:


    func (c *Consumer) AddTopics(newTopics []string) (*Consumer, error) {
        log.Debugf(" AddTopics is triggered ")
        c.cancelFunc() // Cancel the current context
        // Create a new context with the updated list of topics
        newCtx, newCancelFunc := context.WithCancel(context.Background())
        c.ctx = newCtx
        c.cancelFunc = newCancelFunc
    
        c.handler.mutex.Lock()
        defer c.handler.mutex.Unlock()
    
        c.Topics = append(c.Topics, newTopics...)
    
        wg := &sync.WaitGroup{}
        wg.Add(1)
        return rebalanceConsumer(c), nil
    }

func rebalanceConsumer(c *Consumer) *Consumer {
    go func() {
        // Consumer loop that continuously checks the context for cancellation
        for {
            select {
            case <-c.ctx.Done():
                // Context canceled, exit the loop
                log.Printf("Context canceled, exiting consumer loop")
                return
            default:
                if !c.isTryingReconnect {
                    log.Infof("joining consumer group")
                }
                log.Debugf("Rebalancing, adding topic to consume from: %v", c.Topics)
                err := c.Client.Consume(c.ctx, c.Topics, c.handler)
                if err != nil {
                    if err == sarama.ErrUnknownTopicOrPartition {
                        log.Fatalf("The topic or partition [%s] does not exist on this broker: %v", c.Topics, err)
                    }
                    if !c.isTryingReconnect {
                        log.Infof("error when kafka consume: %v", err)
                    }
                    c.isTryingReconnect = true
                    // wait a second then try to reconnect
                    time.Sleep(time.Second)
                    continue
                }
            }
            log.Infof("session ended normally")
            c.handler.readyChan = make(chan bool)
        }
    }()
    <-c.handler.readyChan
    return c
}


LOG OUTPUT for consumer messages after add topics are added:

023-12-21T19:40:42.507231-05:00    [DEBUG] [TEST] consumer.go:290 in kafka.(*consumerGroupHandlerImpl).Setup The Setup is finished successfully
[Sarama-Consumer] 2023/12/21 19:40:42 consumer/broker/0 accumulated 4 new subscriptions
[Sarama-Consumer] 2023/12/21 19:40:42 consumer/broker/0 added subscription to sink.conflict.acbfb023-bbea-470a-a3f8-0e91979458c1/0
[Sarama-Consumer] 2023/12/21 19:40:42 consumer/broker/0 added subscription to source.error.2472ed7c-16e5-4c14-a51f-86daf69c39d6/0
[Sarama-Consumer] 2023/12/21 19:40:42 consumer/broker/0 added subscription to sink.unresolvable.acbfb023-bbea-470a-a3f8-0e91979458c1/0
[Sarama-Consumer] 2023/12/21 19:40:42 consumer/broker/0 added subscription to epi.swift.tests.50000000-0000-5000-8000-000000000000/0
0

There are 0 answers