Data not being transformed Node.js Transform streams

711 views Asked by At

I'm trying to make a transform stream flow that is taking data from socket.io, converting it to JSON, and then sending it to stdout. I am totally perplexed as to why data just seems to go right through without any transformation. I'm using the through2 library. Here is my code:

getStreamNames().then(streamNames => {
        const socket = io(SOCKETIO_URL);
        socket.on('connect', () => {
            socket.emit('Subscribe', {subs: streamNames});
        });

        const stream = through2.obj(function (chunk, enc, callback) {
            callback(null, parseString(chunk))
        }).pipe(through2.obj(function (chunk, enc, callback) {
            callback(null, JSON.stringify(chunk));
        })).pipe(process.stdout);

        socket.on('m', data => stream.write(data));

    },
);

getStreamNames returns a promise which resolves to an array of stream names (i'm calling an external socket.io API) and parseString takes a string returned from the API and converts it to JSON so it's manageable.

What I'm looking for is my console to print out the stringify'd JSON after I parse it using parseString and then make it stdout-able with JSON.stringify. What is actually happening is the data is going right through the stream and doing no transformation.

For reference, the data coming from the API is in a weird format, something like

field1~field2~0x23~fieldn

and so that's why I need the parseString method.

I must be missing something. Any ideas?

EDIT:

parseString:

function(value) {
    var valuesArray = value.split("~");
    var valuesArrayLenght = valuesArray.length;
    var mask = valuesArray[valuesArrayLenght - 1];
    var maskInt = parseInt(mask, 16);
    var unpackedCurrent = {};
    var currentField = 0;
    for (var property in this.FIELDS) {
        if (this.FIELDS[property] === 0) {
            unpackedCurrent[property] = valuesArray[currentField];
            currentField++;
        }
        else if (maskInt & this.FIELDS[property]) {
            if (property === 'LASTMARKET') {
                unpackedCurrent[property] = valuesArray[currentField];
            }
            else {
                unpackedCurrent[property] = parseFloat(valuesArray[currentField]);
            }
            currentField++;
        }
    }

    return unpackedCurrent;
};

Thanks

1

There are 1 answers

2
Marcos Casagrande On BEST ANSWER

The issue is that the stream you're writing, is actually process.stdout, because .pipe returns the last stream.Writable, so you can keep chaining, in your case, process.stdout.

const x = stream.pipe(stream2).pipe(stream3).pipe(process.stdout);
x === process.stdout // true

So all you were doing was: process.stdout.write(data) without going through the pipeline.

What you need to do, is assign your first through2 stream to the stream variable, and then .pipe on that stream.

const stream = through2.obj((chunk, enc, callback) => {
    callback(null, parseString(chunk))
});

stream
    .pipe(through2.obj((chunk, enc, callback) => {
        callback(null, JSON.stringify(chunk));
    }))
    .pipe(process.stdout);

socket.on('m', data => stream.write(data));