How do you manually flush Write in a direct p2p connection with libp2p-go?

140 views Asked by At

I cannot find an API to flush Write in a direct connection with a peer in libp2p (it works fine in pubsub). I create a direct libp2p connection in Go using libp2p-go using host.NewStream. The stream is used by the wrapper below. The problem is that when I write to this stream, the peer at the other end does not receive the message. The underlying network.Stream seems to be internally buffered in libp2p's networking. But I cannot find a way to flush writes anywhere in network.Stream or any of the other libp2p subpackages in go-libp2p-core. Actually I can't find it anywhere. Changing my wrapper to use bufio readers and writers does not have an effect, because it doesn't flush libp2p's internal buffering. I know the messages are sent because when I close the stream, the remote peer receives them.

// NetReaderWriter is a wrapper for a libp2p network stream to an individual peer.
type NetReaderWriter struct {
    stream network.Stream
}

// NewNetReaderWriter creates a new ReadWriteCloser based on a ReadCloser and a WriteCloser.
func NewNetReaderWriter(stream network.Stream) *NetReaderWriter {
    return &NetReaderWriter{
        stream: stream,
    }
}

// Read like in io.Reader
func (rwc *NetReaderWriter) Read(p []byte) (int, error) {
    return rwc.stream.Read(p)
}

// Write like in io.Writer
func (rwc *NetReaderWriter) Write(p []byte) (int, error) {
    k, err := rwc.stream.Write(p)
    // <== flush here, but how???
    return k, err
}

// Close like in io.Closer - this closes both the reader and writer.
func (rwc *NetReaderWriter) Close() error {
    err := rwc.stream.Close()
    if err != nil {
        return fmt.Errorf("unable to close network stream: %v", err)
    }
    return nil
}
1

There are 1 answers

0
Upperwal On

bufio.ReadWriter has a Flush function as documented here

Here is the code you can refer.

func writeData(rw *bufio.ReadWriter) {
    stdReader := bufio.NewReader(os.Stdin)

    for {
        fmt.Print("> ")
        sendData, err := stdReader.ReadString('\n')
        if err != nil {
            log.Println(err)
            return
        }

        rw.WriteString(fmt.Sprintf("%s\n", sendData))
        rw.Flush()
    }
}

taken from the chat example.