I'm trying to make a transform stream flow that is taking data from socket.io
, converting it to JSON, and then sending it to stdout. I am totally perplexed as to why data just seems to go right through without any transformation. I'm using the through2
library. Here is my code:
getStreamNames().then(streamNames => {
const socket = io(SOCKETIO_URL);
socket.on('connect', () => {
socket.emit('Subscribe', {subs: streamNames});
});
const stream = through2.obj(function (chunk, enc, callback) {
callback(null, parseString(chunk))
}).pipe(through2.obj(function (chunk, enc, callback) {
callback(null, JSON.stringify(chunk));
})).pipe(process.stdout);
socket.on('m', data => stream.write(data));
},
);
getStreamNames
returns a promise which resolves to an array of stream names (i'm calling an external socket.io
API) and parseString
takes a string returned from the API and converts it to JSON so it's manageable.
What I'm looking for is my console to print out the stringify'd JSON after I parse it using parseString
and then make it stdout-able with JSON.stringify
. What is actually happening is the data is going right through the stream and doing no transformation.
For reference, the data coming from the API is in a weird format, something like
field1~field2~0x23~fieldn
and so that's why I need the parseString
method.
I must be missing something. Any ideas?
EDIT:
parseString:
function(value) {
var valuesArray = value.split("~");
var valuesArrayLenght = valuesArray.length;
var mask = valuesArray[valuesArrayLenght - 1];
var maskInt = parseInt(mask, 16);
var unpackedCurrent = {};
var currentField = 0;
for (var property in this.FIELDS) {
if (this.FIELDS[property] === 0) {
unpackedCurrent[property] = valuesArray[currentField];
currentField++;
}
else if (maskInt & this.FIELDS[property]) {
if (property === 'LASTMARKET') {
unpackedCurrent[property] = valuesArray[currentField];
}
else {
unpackedCurrent[property] = parseFloat(valuesArray[currentField]);
}
currentField++;
}
}
return unpackedCurrent;
};
Thanks
The issue is that the stream you're writing, is actually
process.stdout
, because.pipe
returns the laststream.Writable
, so you can keep chaining, in your case,process.stdout
.So all you were doing was:
process.stdout.write(data)
without going through the pipeline.What you need to do, is assign your first
through2
stream to thestream
variable, and then.pipe
on that stream.