redigo error log: write: connection reset by peer

1.2k views Asked by At

Almost the same amount of time (point in time as redigo error log: write: connection reset by peer?), redis error log:
Client id=45183 addr=127.0.0.1:40420 fd=39 name= age=39706 idle=46 flags=N db=0 sub=8 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=16114 oll=528 omem=8545237 events=rw cmd=ping scheduled to be closed ASAP for overcoming of output buffer limits.

go error log

write tcp 127.0.0.1:40806->127.0.0.1:6379: write: connection reset by peer

Before that, the Go program didn't receive the subscription message for about 7 minutes. I presume it was a cache overflow caused by messages not being consumed.

The Redis client-output-buffer-limit is the default configuration. The linux fd and connection count are normal, and I can't find of a reason for the unconsumable.

Here is my code:

server.go

func WaitFroMsg(ctx context.Context, pool *redis.Pool, onMessage func(channel string, data []byte) error, channel ...string) (err error) {
    conn := pool.Get()
    psc := redis.PubSubConn{Conn: conn}
    if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
        return err
    }
    done := make(chan error, 1)
    go func() {
        for {
            switch n := psc.Receive().(type) {
            case error:
                done <- fmt.Errorf("redis pubsub receive err: %v", n)
                return
            case redis.Message:
                if err = onMessage(n.Channel, n.Data); err != nil {
                    done <- err
                    return
                }
            case redis.Subscription:
                if n.Count == 0 {
                    fmt.Println("all channels are unsubscribed", channel)
                    done <- nil
                    return
                }
            }
        }
    }()
    const healthCheck = time.Minute
    ticker := time.NewTicker(healthCheck) 
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            if err = psc.Ping(""); err != nil {
                fmt.Println("healthCheck ", err, channel)
                return err
            }
        case err := <-done:
            return err
        case <-ctx.Done(): 
            if err := psc.Unsubscribe(); err != nil {
                return fmt.Errorf("redis unsubscribe failed: %v", err)
            }
            return nil
        }
    }
}

pool.go

func NewPool(addr string, db int) *redis.Pool {
    return &redis.Pool{
        MaxIdle:     3,
        IdleTimeout: 240 * time.Second,
        Dial: func() (redis.Conn, error) {
            c, err := redis.Dial("tcp", addr)
            if err != nil {
                return nil, err
            }
            if _, err = c.Do("SELECT", db); err != nil {
                c.Close()
                return nil, err
            }
            return c, nil
        },
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            if time.Since(t) < time.Minute {
                return nil
            }
            _, err := c.Do("PING")
            fmt.Println("PING error", err)
            return err
        },
    }
}
0

There are 0 answers