I am using nats-jetstream and in that made two consumers as per below code with diff. subjects and same stream. Single publisher code file sends event to both consumers with its subjects. Used durable consumer with unique name for both.
I deployed this two consumers code in two kuberenetes pods in same namespace also used another single pod for publisher. When publisher sends event to both, the first consumer gets the event. another one does not. I am trying to solve this issue considering as race condition for subscribe event in both consumer pods. Still exact don't know about this issue. Please help me.
Code:
func StartNatsJetstream() error {
fmt.Println("StartNatsJetstream runs")
StartSubject := common.SoConfig.MessageBus.WorkflowRequestQueue + ".*"
startConsumer := "tart-consumer" //Consumer name can be anything
fmt.Printf("StartNatsJetstream runs%s", tartSubject)
// Connect to the NATS server
nc, js, err := natsjetstream.SetupJetStream(startSubject)
if err != nil {
log.Errorf("Nats server is not running", err)
return err
}
defer nc.Close()
fmt.Println("Connection got")
// Subscribe to the stream and process messages
_, err = js.Subscribe(StartSubject, func(msg *nats.Msg) {
fmt.Printf("Received message: %s\n", msg.Data)
// Acknowledge the message to prevent it from being sent again
err := msg.Ack()
if err != nil {
fmt.Printf("Error acknowledging message: %v\n", err)
return
}
// Process the message
receivedPayload := msg.Data
var msgData Message
err = json.Unmarshal([]byte(receivedPayload), &msgData)
if err != nil {
fmt.Println("Error unmarshaling JSON:", err)
return
}
//Decoded byte message understand to the user in logs
decodedPayload, err := base64.StdEncoding.DecodeString(msgData.Payload)
if err != nil {
fmt.Println("Error decoding payload:", err)
return
}
var payloadData PayloadData
err = json.Unmarshal(decodedPayload, &payloadData)
if err != nil {
fmt.Println("Error unmarshaling JSON from decoded payload:", err)
return
}
msgData.Payload = string(decodedPayload)
fmt.Printf("Received message: %+v\n", msgData)
//Send byte msgData to process workflow req.
go processMessageNatsJetstream(msgData)
}, nats.Durable(StartConsumer))
if err != nil {
log.Errorf("Error while subscribe", err)
}
fmt.Println("Function completed")
// To keep the goroutine running
select {}
}
Please help me about to identify the issue.