Node.js - optimal way to handle await inside the stream data listener (mbox email parsing)

129 views Asked by At

I'm trying to write a script that counts emails per sender for my mbox file from the gmail takeout (10GB, 150k emails), but my result (sendersCount) logs as empty.

const { MboxStream } = require("node-mbox");
const fs = require("fs");
const simpleParser = require("mailparser").simpleParser;

const mboxFilePath = "gmail.mbox";
const sendersCount = {};

const mailbox = fs.createReadStream(mboxFilePath);
const mbox = MboxStream(mailbox, {});

mbox.on("data", async function (msg) {
    // `msg` is a `Buffer` instance
    console.log("got a message", msg.toString().substring(0, 30));
    const parsed = await simpleParser(msg);
    const sender = parsed.from.value[0].address;
    console.log("sender email", sender);
    sendersCount[sender] = (sendersCount[sender] || 0) + 1;
});

mbox.on("error", function (err) {
  console.log("got an error", err);
});

mbox.on("finish", function () {
  console.log("done reading mbox file");
  console.log(sendersCount); //TODO write to json file
});

I started to use MboxStream after I gave up trying to split fs.createReadStream manually using a custom delimiter (last email was missing when I used "\nFrom "), and mbox in its turn correctly logged every email with the default line splitter. Then I couldn't figure out how to parse the buffer manually, and since I'll need quite a few fields from the email anyway, I decided to use simpleParser, which is async.

Looks like mbox event listeners (due to being a Transform stream implementation) are not designed for asynchronous code - it does not wait for "awaits" to complete - "sender email" logs appear after the "done reading mbox file" log.

First thing I thought to try was pushing simpleParser promises and calling Promise.all, but I feel like resolving 150k promises could affect the performance in my case, and I need this script to be reasonably fast to repeatedly test on a large set of data.

I looked through the similar stream/await questions, but the answers either didn't properly work for me, or were not obvious to apply:

  • I already tried mbox.pause/resume inside the "data" listener, which makes all but the last email be parsed before the "finish". Not sure why it's happening.
  • Another alternative I found is using the pipeline from the stream/promises module, but I'm still trying to figure out how to apply it in my specific case.

I'm also open to alternative suggestions on better ways to parse a large amount of mbox data for stats purposes.

1

There are 1 answers

0
leodevbro On

Your code works fine, I tested it. Also, if you don't like require('mailparser').simpleParser, you can use require('mailparser').MailParser. as in the official docs, .MailParser must be faster than .simpleParser because it is lower level.

https://nodemailer.com/extras/mailparser/

The module exposes two separate modes, a lower level MailParser class and simpleParser function. The latter is simpler to use (hence the name) but is less resource efficient as it buffers attachment contents in memory.