I want to use MongoDB change streams to watch insertion/updates on a first collection to populate, when a condition is meet,another collection with computed values extracted from the watched collection.
Following Mongodb tutorial, I came to the following results:
require('dotenv').config();
const { MongoClient } = require('mongodb');
const stream = require('stream');
const es = require('event-stream');
async function monitorListingsUsingStreamAPI(client, pipeline = []) {
const collection = client
.db(process.env.MONGO_DB)
.collection(process.env.COLLECTION_TO_MONITOR);
const changeStream = collection.watch(pipeline);
const collection_dest = client
.db(process.env.MONGO_DB)
.collection(process.env.COLLECTION_TO_POPULATE);
changeStream.pipe(
es.map(function (doc, next) {
const { _id, ...data } = doc.fullDocument;
const new_doc = { size: data.samples.length, data };
(async () => {
await collection_dest.insertOne(new_doc, next);
})();
}),
);
}
async function main() {
const uri = process.env.MONGO_DB_URI;
const client = new MongoClient(uri, {
useUnifiedTopology: true,
useNewUrlParser: true,
});
try {
// Connect to the MongoDB cluster
await client.connect();
const pipeline = [
{
$match: {
operationType: 'insert',
'fullDocument.samples': { $size: 3 },
},
},
];
// Monitor new listings using the Stream API
await monitorListingsUsingStreamAPI(client, pipeline);
}
}
Actually it seems to work but I used event-stream
to pipe
MongoDB change stream into another one where I used an immediately-invoked anonymous async functions to populate the second collection.
I wonder if this approach is correct? How to use transform streams?