How to keep MongoDB change stream continuously running in Node.js?

59 views Asked by At

I am using Mongoose version 7.1.0 in a Node.js application. I have set up a change stream on a MongoDB collection to listen for insert operations. When an insert operation occurs, I send a notification to certain users.

Here is the relevant part of my code:

let resumeTokenRis;

const ris = mongoose.connection.db.collection("ris");
const changeStreamRis = ris.watch([], { resumeAfter: resumeTokenRis });

changeStreamRis.on("change", async (change) => {
  if (change.operationType === "insert") {
    resumeTokenRis = change._id;
    console.log("RI Inserted");

    const ri = change.fullDocument;
    let userIds = ri.issueStat.taggedUser?.map(userId => userId.toString()) || [];
    userIds = [...new Set(userIds)].map(id => new ObjectId(id));

    const userTokens = await NotificationToken.find({ userId: { $in: userIds } });
    const user = await User.findById(ri.user);

    let tokens = userTokens.flatMap(userToken => userToken.tokens);

    if (tokens.length === 0) {
      console.log("No tokens found for this user.");
      return;
    }

    const message = {
      notification: {
        title: `New RI Assignment`,
        body: `Assigned by ${user.name} with ${ri.priority} priority.`,
      },
      data: {
        id: `${ri.refId}`,
        type: "RI",
      },
      tokens: tokens,
    };

    admin.messaging().sendEachForMulticast(message)
      .then((response) => {
        console.log(response);
        response.responses.forEach((res, index) => {
          if (!res.success) {
            const failedToken = tokens[index];
            console.log(`Failed token ID: ${failedToken}`);

            // Delete outdated token
            NotificationToken.updateOne(
              { userId: { $in: userIds } },
              { $pull: { tokens: failedToken } }
            )
              .then(() => console.log(`Token ${failedToken} deleted successfully`))
              .catch((error) => console.error(`Failed to delete token ${failedToken}: ${error}`));
          }
        });
      })
      .catch((error) => {
        console.log(error);
      });
  }
});

The problem I'm facing is that the change stream sometimes stops listening for changes. I suspect this might be due to inactivity, but I'm not sure. There are no error messages in my console when this happens.

I want the change stream to keep running continuously, even if there are no changes for a long time. How can I keep the change stream continuously running? Any help would be appreciated.

0

There are 0 answers