How use MongoDB change streams in Node.js to populate a new collection

543 views Asked by At

I want to use MongoDB change streams to watch insertion/updates on a first collection to populate, when a condition is meet,another collection with computed values extracted from the watched collection.
Following Mongodb tutorial, I came to the following results:

require('dotenv').config();
const { MongoClient } = require('mongodb');
const stream = require('stream');
const es = require('event-stream');

async function monitorListingsUsingStreamAPI(client, pipeline = []) {
  const collection = client
    .db(process.env.MONGO_DB)
    .collection(process.env.COLLECTION_TO_MONITOR);

  
  const changeStream = collection.watch(pipeline);
  const collection_dest = client
    .db(process.env.MONGO_DB)
    .collection(process.env.COLLECTION_TO_POPULATE);   

  changeStream.pipe(
    es.map(function (doc, next) {
      const { _id, ...data } = doc.fullDocument;
      const new_doc = { size: data.samples.length, data };
      (async () => {
        await collection_dest.insertOne(new_doc, next);
      })();
    }),
  );
}

async function main() {
 
  const uri = process.env.MONGO_DB_URI;
  const client = new MongoClient(uri, {
    useUnifiedTopology: true,
    useNewUrlParser: true,
  });

  try {
    // Connect to the MongoDB cluster
    await client.connect();
    const pipeline = [
      {
        $match: {
          operationType: 'insert',
          'fullDocument.samples': { $size: 3 },
        },
      },
    ];

    //  Monitor new listings using the Stream API
    await monitorListingsUsingStreamAPI(client, pipeline);
  } 
}

Actually it seems to work but I used event-stream to pipe MongoDB change stream into another one where I used an immediately-invoked anonymous async functions to populate the second collection.

I wonder if this approach is correct? How to use transform streams?

0

There are 0 answers