I am piping the response from a Node request into a transform stream using through2Concurrent
. This response comes in as a buffer and is parsed to an object using JSONStream
. That then gets piped into my transform stream. The transform stream function then makes an HTTP requests, formats the response and stores it into a MongoDB. We are using concurrent streams because it would take an unacceptably long time to handle everything otherwise.
response Stream -> JSONStream.parse() -> Transform Stream
Problem Description
The initial response stream contains roughly 18,000 objects once parsed. However, the stream terminates and an finish
event is received before all 18,000 objects are handled. No error is thrown, but only about 2,000 - 5,000 objects are actually handled before the stream ends. The exact number handled varies.
Here is the relevant code:
const analyticsTransformer = through2Concurrent.obj({
maxConcurrency: 15
}, async (doc, enc, cb) => {
// Make an http request. This is a relatively long request.
const res = await apim.getAnalytics(doc);
// Save response to mongo.
await UsageData.save(res);
cb();
});
// Kick off the streaming.
broker.getInstances()
.pipe(JSONStream.parse('*')
.pipe(analyticsTransformer)
.on('finish', () => {
// We reach this way too quickly before we have handled all 18,000 objects
})
.on('error', err => {
// No errors are caught.
})
What I have Tried
- Waiting for an 'end' event: Same result. Unhandled objects and early termination.
- Using
through2
(notthrough2Concurrent
): Receive ETIMEOUT after several thousand objects have come through. - Setting the
highWaterMark
to 18,000: This is the only thing that has worked. I can handle all of the objects if I change thishighWatermark
value, but this is really just a bandaid on the problem. I want to know why this works and what I can do to fix my streaming problems in a robust way.
Setting the highWaterMark
looks like this:
const analyticsTransformer = through2Concurrent.obj({
highWaterMark: 18,000,
maxConcurrency: 15
}, async (doc, enc, cb) => {
// ...
});
Why does changing the highWaterMark
value work?
What is the real cause of my early terminated stream?
How can I fix it?
Thanks in advance to anyone that can help! :)