I want to pause my cassandra stream for some async operations before processing the next row.
Each row is received in a readable event listener. I have tried using stream.pause but it does not in fact pauses the stream. I have also tried the same in the 'data' event listener and that too doesn't work. Will be extremely grateful for insights and a resolution perhaps. Here is my code. using async in readable and "awaiting" using await does not in fact prevent the next row to come before the asynchronous function finishes.
function start() {
let stream = client.stream('SELECT * FROM table');
stream
.on('end', function () {
console.log(`Ended at ${Date.now()}`);
})
.on('error', function (err) {
console.error(err);
})
.on('readable', function () {
let row = this.read();
asyncFunctionNeedTowaitForthisBeforeNextRow()
})
}
//The below doesn't work
function start() {
let stream = client.stream('SELECT * FROM table');
stream
.on('end', function () {
console.log(`Ended at ${Date.now()}`);
})
.on('error', function (err) {
console.error(err);
})
.on('readable', async function () {
let row = this.read();
stream.pause();
await asyncFunctionNeedTowaitForthisBeforeNextRow();
stream.resume();
})
}
The reason why
stream.pause()doesn't work is because thereadableevent fires multiple times, so the same async function is called again. Same is for thedataevent.I would recommend to use a custom writable stream to handle all this async stuff properly.
The writable stream will look something like:
Then adjust your code to use this writable: