I am trying to have some consumers to process messages from kafka, and I would like to implement kubernetes deployment scalability for elastic message processing capability.
I found this code from sarama official guide https://pkg.go.dev/github.com/Shopify/sarama#NewConsumerGroup:
package main
import (
"context"
"fmt"
)
type exampleConsumerGroupHandler struct{}
func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
sess.MarkMessage(msg, "")
}
return nil
}
func main() {
config := NewTestConfig()
config.Version = V2_0_0_0 // specify appropriate version
config.Consumer.Return.Errors = true
group, err := NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
if err != nil {
panic(err)
}
defer func() { _ = group.Close() }()
// Track errors
go func() {
for err := range group.Errors() {
fmt.Println("ERROR", err)
}
}()
// Iterate over consumer sessions.
ctx := context.Background()
for {
topics := []string{"my-topic"}
handler := exampleConsumerGroupHandler{}
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
err := group.Consume(ctx, topics, handler)
if err != nil {
panic(err)
}
}
}
I have some questions:
- how to set numbers of consumers in a consumer group?
- If I deploy this program in a Pod, can I scale it safely? I mean, assume one program is running, and I scale the replicas from 1 to 2, will another
NewConsumerGroup
call with the same group id works perfectly without conflict?
Thank you in advance.
NOTE: I am using Kafka 2.8 and I heard that sarama_cluster package is DEPRECATED.
Reminder that groups cannot scale beyond the topic partition count
Scaling the pods is the correct way to use consumer groups, and using the same group name is correct, however I'd recommend extracting that and the broker address to environment variables so they can easily be changed at deploy time
As-is the containerized code would be unable to use localhost as a Kafka connection string as that would be the pod itself