I work currently on a micro service architecture. Before I insert NATS into my project I wanted to test some simple scenarios with it.
In one scenario I have a simple publisher, which publishes 100.000 messages in a for loop over a basic Nats server running on localhost:4222.
The big problem with it, is the subscriber. When he receive between 30.000 - 40.000 messages my whole main.go program and all other go routines just stops and do nothing. I can just quit with ctrl + c. But the Publisher is still keep sending the messages. When I open a new terminal and start a new instance of the subscriber all again works well, till the Subscriber receive about 30000 messages. And the worst thing is that there appears not even one error and also no logs on the server so I have no idea whats going on.
After that I was trying replace the Subscribe-method with the QueueSubscribe-method and all works fine.
What is the main difference between Subscribe and QueueSubscribe?
Is NATS-Streaming a better opportunity? Or in which cases I should prefer Streaming and in which the standard NATS-Server
Here is my code:
Publisher:
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/go-nats"
)
func main() {
go createPublisher()
for {
}
}
func createPublisher() {
log.Println("pub started")
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
msg := make([]byte, 16)
for i := 0; i < 100000; i++ {
nc.Publish("alenSub", msg)
if (i % 100) == 0 {
fmt.Println("i", i)
}
time.Sleep(time.Millisecond)
}
log.Println("pub finish")
nc.Flush()
}
Subscriber:
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/go-nats"
)
var received int64
func main() {
received = 0
go createSubscriber()
go check()
for {
}
}
func createSubscriber() {
log.Println("sub started")
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
nc.Subscribe("alenSub", func(msg *nats.Msg) {
received++
})
nc.Flush()
for {
}
}
func check() {
for {
fmt.Println("-----------------------")
fmt.Println("still running")
fmt.Println("received", received)
fmt.Println("-----------------------")
time.Sleep(time.Second * 2)
}
}
The infinite
for
loops are likely starving the garbage collector: https://github.com/golang/go/issues/15442#issuecomment-214965471I was able to reproduce the issue by just running the publisher. To resolve, I recommend using a
sync.WaitGroup
. Here's how I updated the code linked to in the comments to get it to complete:I'd recommend updating the above subscriber code similarly.
The main difference between
Subscribe
andQueueSubscriber
is that inSubscribe
all subscribers are sent all messages from. While inQueueSubscribe
only one subscriber in aQueueGroup
is sent each message.Some details on additional features for NATS Streaming are here: https://nats.io/documentation/streaming/nats-streaming-intro/
We see both NATS and NATS Streaming used in a variety of use cases from data pipelines to control planes. Your choice should be driven by the needs of your use case.