I currently have a program that creates a workergroup of size 1, which then calls startworker:
package main
import (
"db_write_consumer/db"
"db_write_consumer/worker"
"os"
"os/signal"
"syscall"
)
func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
for _, w := range workers {
w_ := w
worker.StartWorker(w_, []string{"test-topic"}, sigchan, mySQLClient)
}
}
where CreateGroup is written:
func CreateGroup(bootstrapServers string, groupId string, numWorkers int) []*kafka.Consumer {
consumers := []*kafka.Consumer{}
for i := 0; i < numWorkers; i++ {
c := NewWorker(bootstrapServers, groupId)
consumers = append(consumers, c)
}
return consumers
}
and Startworker is written:
func StartWorker(c *kafka.Consumer, topics []string, sigchan chan os.Signal, mySQLClient *sql.DB) {
_ = c.SubscribeTopics(topics, nil)
fmt.Println(c)
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev, _ := c.ReadMessage(100)
if ev == nil {
continue
}
msg := &pb.Person{}
proto.Unmarshal(ev.Value, msg)
WriteStuff(mySQLClient, msg.Id, msg.Lastname, msg.Firstname, msg.Address, msg.City)
if ev.Headers != nil {
fmt.Printf("%% Headers: %v\n", ev.Headers)
}
_, err := c.StoreMessage(ev)
if err != nil {
fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n",
ev.TopicPartition)
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
this works fine for workergroup size 1, but every attempt to make this work for a larger workergroup size fails--all i've learned so far is that i'll want context.WithCancel(context.Background()) passed down into the worker funcs from main, but i'm lost with how to set up a waitgroup or goroutines to actually do this work
I understand that your question is how to manage lifetime of workers using context (instead of
sigchan). Easiest way is to use signal.NotifyContext - this gives you a context which gets cancelled when one of the signals is sent. So the main would becomeNote also the use of WaitGroup to avoid the
mainexiting before all the workers finish. AndStartWorkerwould be like