Golang Nats subscribe issue

6.1k views Asked by At

I work currently on a micro service architecture. Before I insert NATS into my project I wanted to test some simple scenarios with it.

In one scenario I have a simple publisher, which publishes 100.000 messages in a for loop over a basic Nats server running on localhost:4222.

The big problem with it, is the subscriber. When he receive between 30.000 - 40.000 messages my whole main.go program and all other go routines just stops and do nothing. I can just quit with ctrl + c. But the Publisher is still keep sending the messages. When I open a new terminal and start a new instance of the subscriber all again works well, till the Subscriber receive about 30000 messages. And the worst thing is that there appears not even one error and also no logs on the server so I have no idea whats going on.

After that I was trying replace the Subscribe-method with the QueueSubscribe-method and all works fine.

What is the main difference between Subscribe and QueueSubscribe?

Is NATS-Streaming a better opportunity? Or in which cases I should prefer Streaming and in which the standard NATS-Server

Here is my code:

Publisher:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/nats-io/go-nats"
)

func main() {
    go createPublisher()

    for {

    }
}

func createPublisher() {

    log.Println("pub started")

    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    msg := make([]byte, 16)

    for i := 0; i < 100000; i++ {
        nc.Publish("alenSub", msg)
        if (i % 100) == 0 {
            fmt.Println("i", i)
        }
        time.Sleep(time.Millisecond)
    }

    log.Println("pub finish")

    nc.Flush()

}

Subscriber:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/nats-io/go-nats"
)

var received int64

func main() {
    received = 0

    go createSubscriber()
    go check()

    for {

    }
}

func createSubscriber() {

    log.Println("sub started")

    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    nc.Subscribe("alenSub", func(msg *nats.Msg) {
        received++
    })
    nc.Flush()

    for {

    }
}

func check() {
    for {
        fmt.Println("-----------------------")
        fmt.Println("still running")
        fmt.Println("received", received)
        fmt.Println("-----------------------")
        time.Sleep(time.Second * 2)
    }
}
2

There are 2 answers

1
Peter Miron On BEST ANSWER

The infinite for loops are likely starving the garbage collector: https://github.com/golang/go/issues/15442#issuecomment-214965471

I was able to reproduce the issue by just running the publisher. To resolve, I recommend using a sync.WaitGroup. Here's how I updated the code linked to in the comments to get it to complete:

package main

import (
    "fmt"
    "log"
    "sync"
    "time"

    "github.com/nats-io/go-nats"
)

// create wait group
var wg sync.WaitGroup

func main() {
    // add 1 waiter
    wg.Add(1)
    go createPublisher()

    // wait for wait group to complete
    wg.Wait()
}

func createPublisher() {

    log.Println("pub started")
    // mark wait group done after createPublisher completes
    defer wg.Done()

    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    msg := make([]byte, 16)

    for i := 0; i < 100000; i++ {
        if errPub := nc.Publish("alenSub", msg); errPub != nil {
            panic(errPub)
        }

        if (i % 100) == 0 {
            fmt.Println("i", i)
        }
        time.Sleep(time.Millisecond * 1)
    }

    log.Println("pub finish")

    errFlush := nc.Flush()
    if errFlush != nil {
        panic(errFlush)
    }

    errLast := nc.LastError()
    if errLast != nil {
        panic(errLast)
    }

}

I'd recommend updating the above subscriber code similarly.

The main difference between Subscribe and QueueSubscriber is that in Subscribe all subscribers are sent all messages from. While in QueueSubscribe only one subscriber in a QueueGroup is sent each message.

Some details on additional features for NATS Streaming are here: https://nats.io/documentation/streaming/nats-streaming-intro/

We see both NATS and NATS Streaming used in a variety of use cases from data pipelines to control planes. Your choice should be driven by the needs of your use case.

1
derek On

As stated, remove the for{} loop. Replace with runtime.Goexit().

For subscriber you don't need to create the subscriber in a Go routine. Async subscribers already have their own Go routine for callbacks.

Also protected the received variable with atomic or a mutex.

See the examples here as well.

https://github.com/nats-io/go-nats/tree/master/examples