NodeJS Stream splitting

1.3k views Asked by At

I have an infinite data stream from a forked process. I want this stream to be processed by a module and sometimes I want to duplicate the data from this stream to be processed by a different module (e.g. monitoring a data stream but if anything interesting happens I want to log the next n bytes to file for further investigation).

So let's suppose the following scenario:

  1. I start the program and start consuming the readable stream
  2. 2 secs later I want to process the same data for 1 sec by a different stream reader
  3. Once the time is up I want to close the second consumer but the original consumer must stay untouched.

Here is a code snippet for this:

var stream = process.stdout;

stream.pipe(detector); // Using the first consumer

function startAnotherConsumer() {
    stream2 = new PassThrough();
    stream.pipe(stream2);

    // use stream2 somewhere else
}

function stopAnotherConsumer() {
    stream.unpipe(stream2);
}

My problem here is that unpiping the stream2 doesn't get it closed. If I call stream.end() after the unpipe command, then it crashes with the error:

events.js:160
      throw er; // Unhandled 'error' event
      ^

Error: write after end
    at writeAfterEnd (_stream_writable.js:192:12)
    at PassThrough.Writable.write (_stream_writable.js:243:5)
    at Socket.ondata (_stream_readable.js:555:20)
    at emitOne (events.js:101:20)
    at Socket.emit (events.js:188:7)
    at readableAddChunk (_stream_readable.js:176:18)
    at Socket.Readable.push (_stream_readable.js:134:10)
    at Pipe.onread (net.js:548:20)

I even tried to pause the source stream to help the buffer to be flushed from the second stream but it didn't work either:

function stopAnotherConsumer() {
    stream.pause();
    stream2.once('unpipe', function () {
        stream.resume();
        stream2.end();
    });
    stream.unpipe(stream2);
}

Same error as before here (write after end).

How to solve the problem? My original intent is to duplicate the streamed data from one point, then close the second stream after a while.

Note: I tried to use this answer to make it work.

1

There are 1 answers

0
Andras On

As there were no answers, I post my (patchwork) solution. In case anyone'd have a better one, don't hold it back.

A new Stream:

const Writable = require('stream').Writable;
const Transform = require('stream').Transform;

class DuplicatorStream extends Transform {
    constructor(options) {
        super(options);

        this.otherStream = null;
    }

    attachStream(stream) {
        if (!stream instanceof Writable) {
            throw new Error('DuplicatorStream argument is not a writeable stream!');
        }

        if (this.otherStream) {
            throw new Error('A stream is already attached!');
        }

        this.otherStream = stream;
        this.emit('attach', stream);
    }

    detachStream() {
        if (!this.otherStream) {
            throw new Error('No stream to detach!');
        }

        let stream = this.otherStream;
        this.otherStream = null;
        this.emit('detach', stream);
    }

    _transform(chunk, encoding, callback) {
        if (this.otherStream) {
            this.otherStream.write(chunk);
        }

        callback(null, chunk);
    }
}

module.exports = DuplicatorStream;

And the usage:

var stream = process.stdout;
var stream2;

duplicatorStream = new DuplicatorStream();
stream.pipe(duplicatorStream);   // Inserting my duplicator stream in the chain
duplicatorStream.pipe(detector); // Using the first consumer

function startAnotherConsumer() {
    stream2 = new stream.PassThrough();
    duplicatorStream.attachStream(stream2);

    // use stream2 somewhere else
}

function stopAnotherConsumer() {
    duplicatorStream.once('detach', function () {
        stream2.end();
    });
    duplicatorStream.detachStream();
}