One of the base behaviour of node's stream is to block when writing on a paused stream, and any non piped stream is blocked.
In this example, the created PassThrough
is not piped to anything in it's creation event loop. One would expect any pipeline run on this PassThrough
to block until it is piped / a data event is attached, but this is not the case.
The pipeline
callbacks, but nothing is consumed.
const {promises: pFs} = require('fs');
const fs = require('fs');
const {PassThrough} = require('stream');
const {pipeline: pipelineCb} = require('stream');
const util = require('util');
const pipeline = util.promisify(pipelineCb);
const path = require('path');
const assert = require('assert');
/**
* Start a test ftp server
* @param {string} outputPath
* @return {Promise<void>}
*/
function myCreateWritableStream (outputPath) {
// The stream is created in paused mode -> should block until piped
const stream = new PassThrough();
(async () => {
// Do some stuff (create directory / check space / connect...)
await new Promise(resolve => setTimeout(resolve, 500));
console.log('piping passThrough to finale output');
// Consume the stream
await pipeline(stream, fs.createWriteStream(outputPath));
console.log('passThrough stream content written');
})().catch(e => {
console.error(e);
stream.emit('error', e);
});
return stream;
}
/**
* Main test function
* @return {Promise<void>}
*/
async function main () {
// Prepare the test directory with a 'tmp1' file only
const smallFilePath = path.join(__dirname, 'tmp1');
const smallFileOut = path.join(__dirname, 'tmp2');
await Promise.all([
pFs.writeFile(smallFilePath, 'a small content'),
pFs.unlink(smallFileOut).catch(e => assert(e.code === 'ENOENT'))
]);
// Duplicate the tmp1 file to tmp2
await pipeline([
fs.createReadStream(smallFilePath),
myCreateWritableStream(smallFileOut)
]);
console.log('pipeline ended');
// Check content
const finalContent = await pFs.readdir(__dirname);
console.log('directory content');
console.log(finalContent.filter(file => file.startsWith('tmp')));
}
main().catch(e => {
process.exitCode = 1;
console.error(e);
});
This code output the following lines:
pipeline ended
directory content
[ 'tmp1' ]
piping passThrough to finale output
passThrough stream content written
If the pipeline
really waited for the stream to end, then the output would be this one:
piping passThrough to finale output
passThrough stream content written
pipeline ended
directory content
[ 'tmp1', 'tmp2' ]
How can you explain this behaviour ?
I don't think the API gives the guarantees you are looking for here.
The
stream.pipeline
calls its callback after all data has finished writing. Since the data has been written to a new Transform stream (your Passthrough), and that stream has nowhere to put the data yet, it simply gets stored in the stream's internal buffer. That is good enough for the pipeline.If you were to read a large enough file, filling the Transform stream's buffer, the stream backpressure can automatically trigger a
pause()
on the readable that is reading a file. Once the Transform stream drains, it will automaticallyunpause()
the readable so data flow resumes.I think your example makes two incorrect assumptions:
(1) That you can pause a transform stream. According to the stream docs, pausing any stream that is piped to a destination is ineffective, because it will immediately unpause itself as soon as a piped destination asks for more data. Also, a paused transform stream still reads data! A paused stream just doesn't write data.
(2) That a pause further down a pipeline somehow propagates up to the front of a pipeline and causes data to stop flowing. This is only true if caused by backpressure, meaning, you would need to trigger node's detection of a full internal buffer.
When working with pipes, it's best to assume you have manual control over the two farthest ends, but not necessarily of any of the pieces in the middle. (You can manually
pipe()
andunpipe()
to connect and disconnect intermediate streams, but you can't pause them.)