Multer is built on busboy, and is essentially this file:
var is = require('type-is')
var Busboy = require('busboy')
var extend = require('xtend')
var onFinished = require('on-finished')
var appendField = require('append-field')
var Counter = require('./counter')
var MulterError = require('./multer-error')
var FileAppender = require('./file-appender')
var removeUploadedFiles = require('./remove-uploaded-files')
function drainStream (stream) {
stream.on('readable', stream.read.bind(stream))
}
function makeMiddleware (setup) {
return function multerMiddleware (req, res, next) {
if (!is(req, ['multipart'])) return next()
var options = setup()
var limits = options.limits
var storage = options.storage
var fileFilter = options.fileFilter
var fileStrategy = options.fileStrategy
var preservePath = options.preservePath
req.body = Object.create(null)
var busboy
try {
busboy = new Busboy({ headers: req.headers, limits: limits, preservePath: preservePath })
} catch (err) {
return next(err)
}
var appender = new FileAppender(fileStrategy, req)
var isDone = false
var readFinished = false
var errorOccured = false
var pendingWrites = new Counter()
var uploadedFiles = []
function done (err) {
if (isDone) return
isDone = true
req.unpipe(busboy)
drainStream(req)
busboy.removeAllListeners()
onFinished(req, function () { next(err) })
}
function indicateDone () {
if (readFinished && pendingWrites.isZero() && !errorOccured) done()
}
function abortWithError (uploadError) {
if (errorOccured) return
errorOccured = true
pendingWrites.onceZero(function () {
function remove (file, cb) {
storage._removeFile(req, file, cb)
}
removeUploadedFiles(uploadedFiles, remove, function (err, storageErrors) {
if (err) return done(err)
uploadError.storageErrors = storageErrors
done(uploadError)
})
})
}
function abortWithCode (code, optionalField) {
abortWithError(new MulterError(code, optionalField))
}
// handle text field data
busboy.on('field', function (fieldname, value, fieldnameTruncated, valueTruncated) {
if (fieldname == null) return abortWithCode('MISSING_FIELD_NAME')
if (fieldnameTruncated) return abortWithCode('LIMIT_FIELD_KEY')
if (valueTruncated) return abortWithCode('LIMIT_FIELD_VALUE', fieldname)
// Work around bug in Busboy (https://github.com/mscdex/busboy/issues/6)
if (limits && Object.prototype.hasOwnProperty.call(limits, 'fieldNameSize')) {
if (fieldname.length > limits.fieldNameSize) return abortWithCode('LIMIT_FIELD_KEY')
}
appendField(req.body, fieldname, value)
})
// handle files
busboy.on('file', function (fieldname, fileStream, filename, encoding, mimetype) {
// don't attach to the files object, if there is no file
if (!filename) return fileStream.resume()
// Work around bug in Busboy (https://github.com/mscdex/busboy/issues/6)
if (limits && Object.prototype.hasOwnProperty.call(limits, 'fieldNameSize')) {
if (fieldname.length > limits.fieldNameSize) return abortWithCode('LIMIT_FIELD_KEY')
}
var file = {
fieldname: fieldname,
originalname: filename,
encoding: encoding,
mimetype: mimetype
}
var placeholder = appender.insertPlaceholder(file)
fileFilter(req, file, function (err, includeFile) {
if (err) {
appender.removePlaceholder(placeholder)
return abortWithError(err)
}
if (!includeFile) {
appender.removePlaceholder(placeholder)
return fileStream.resume()
}
var aborting = false
pendingWrites.increment()
Object.defineProperty(file, 'stream', {
configurable: true,
enumerable: false,
value: fileStream
})
fileStream.on('error', function (err) {
pendingWrites.decrement()
abortWithError(err)
})
fileStream.on('limit', function () {
aborting = true
abortWithCode('LIMIT_FILE_SIZE', fieldname)
})
storage._handleFile(req, file, function (err, info) {
if (aborting) {
appender.removePlaceholder(placeholder)
uploadedFiles.push(extend(file, info))
return pendingWrites.decrement()
}
if (err) {
appender.removePlaceholder(placeholder)
pendingWrites.decrement()
return abortWithError(err)
}
var fileInfo = extend(file, info)
appender.replacePlaceholder(placeholder, fileInfo)
uploadedFiles.push(fileInfo)
pendingWrites.decrement()
indicateDone()
})
})
})
busboy.on('error', function (err) { abortWithError(err) })
busboy.on('partsLimit', function () { abortWithCode('LIMIT_PART_COUNT') })
busboy.on('filesLimit', function () { abortWithCode('LIMIT_FILE_COUNT') })
busboy.on('fieldsLimit', function () { abortWithCode('LIMIT_FIELD_COUNT') })
busboy.on('finish', function () {
readFinished = true
indicateDone()
})
req.pipe(busboy)
}
}
module.exports = makeMiddleware
In that code they have essentially
abortWithError(() => pendingWrites.onceZero(() => removeFiles()...))
So it seems like it's waiting for all the files to finish writing to disk, before it then removes them, which seems like a waste.
I think having access to the file stream somehow and canceling writing to the disk file stream would save a lot on large files. But I don't know the proper way to cancel the chain of streams.
The streaming starts with:
req.pipe(busboy)
Pipe the request into the busboy parser instance, which will emit file and field values. The file one comes with a stream, which you then use like this:
fileStream.on('error', function (err) {
pendingWrites.decrement()
abortWithError(err)
})
fileStream.on('limit', function () {
aborting = true
abortWithCode('LIMIT_FILE_SIZE', fieldname)
})
const writeStream = fs.createWriteStream('/tmp/blah')
fileStream.pipe(writeStream)
Unfortunately, multer internals doesn't have access to the file writeStream, which is why I think it is written the way it is. So it does abortWithError and then waits for all the writes to finish before moving on.
But if you did have access to the file write stream, what would you do?
fileStream.on('error', function (err) {
// 1. should you close the write stream?
writeStream.close()
// 2. should you instead `destroy` it?
writeStream.destroy()
// 3. would just unpiping work?
// (and the added fileStream.on('unpipe') handler)
fileStream.unpipe(writeStream)
// 4. then delete the file immediately on error after everything is stopped?
fs.unlink('/tmp/blah')
})
const writeStream = fs.createWriteStream('/tmp/blah')
fileStream.on('unpipe', () => {
// 3. do you need to explicitly close the write stream?
writeStream.close()
})
fileStream.pipe(writeStream)
Which one of those approaches (or mixture thereof) is the appropriate solution to stopping streaming the file from the HTTP request, to stopping writing to the tmp file, to removing the tmp file on error?
This is what I ended up with:
With this as the usage: