I have a Readable that pipes into a WritableStream (https://github.com/fb55/htmlparser2?tab=readme-ov-file#usage-with-streams) from one library, which provides a SAX interface for parsing data. I want to pipe SAX stream into a new stream that takes parsed data and performs buffered inserts into a database (using async await).
Objective is following:
- Get file stream from S3
- Parse with Writable Stream from htmlparser2
- During parsing push chunks of data to Kafka
Here's what I managed to do without using pipes:
const content: Readable = getFileFromS3()
const chunkSize: number = 50
const handler = async (data) => ...
const writableKafkaStream = new KafkaWritableStream(chunksSize, handler)
const currentParsingObject = {}
const parser = new SaxWritableStream(
{
onopentag(name, attributes) {
// ... construct data based on opened tag
currentParsingObject.someProp = name
},
ontext(text) {
// ... construct data based on text
currentParsingObject.text = text
},
onclosetag(tagname) {
// ... finish constructing data and write to a stream
currentParsingObject.someProp2 = tagname
// sending data to another stream
writableKafkaStream.write(currentParsingObject)
},
},
{ xmlMode: true },
)
return new Promise(resolve => {
content.pipe(parser).on('finish', () => {
writableKafkaStream.end()
resolve(count)
})
})
But it looks like it overflows writableKafkaStream and does not pause the original stream while running writes in writableKafkaStream.
I am searching for a method to pipe the writableKafkaStream. As the SaxWritableStream solely triggers event callbacks (ontext, ontag and etc), I'm unsure of how to direct parsed data to another stream through piping.
EDIT:
I have added some pauses to the original stream when the write returns false:
onclosetag(tagname) {
if (shouldAddToWritableKafkaStream) {
if (!writableKafkaStream.write(data)) {
content.pause()
writableKafkaStream.once('drain', () => {
content.resume()
})
}
count += 1
}
},
It seems to work (memory not consuming too much) but it shows lots of warnings:
(node:26490) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 drain listeners added to [KafkaTransporterTransformStream]. Use emitter.setMaxListeners() to increase limit
You use the throttling mechanism "if
writereturns false thenpauseandresumeondrain". But this throttles only the chunks being written to the SAXparser, and even one chunk can be so big that it leads to manywritableKafkaStream.writeoperations, which may still overflow Kafka. This also explains the warnings about too manydrainhandlers that you observe.I propose a different throttling mechanism which ensures that a write operation to Kafka finishes before the next one starts. This requires an asynchronous
writeToKafkafunction which awaits the callback from thewritableKafkaStream.writeoperation (assuming this callback happens only after Kafka has received the data):Using this function, the synchronous
onclosetaghandler can maintain aqueueof promises which makes the writing to Kafka asynchronous.To also pause the original stream, pipe your
contentinto aWritablestream that passes it on to the SAXparseronly after the previousqueuehas completed: