Multiple DB Connections are getting Created In MongoDb

182 views Asked by At

I have to do real time changes in my site. I'm using Nest.js and server sent Events and change streams for real time changes.

Client Will call My SSE API passing some required information in query params and Authorization Header. What I'm doing in my SSE API is on change in mongoDB collection(with watch method of change stream), will pass specific data user wise.

In my service I have set Watch on 2 collections and in SSE API doing operation ON CHANGE of mongo watch instance.

**ISSUE: ** Multiple DB connections are getting created when I check in Atlas. Example: Lets say User 1 changes some data, and User 2,3,4,5,6 are already on page and connected to Server sent events API so they are gonna receive real time change for this.When they receive data there is sudden increase in number of connections

But I do have initialized/Open My change Stream Globally and Not in SSE API, also I do know every Collection Watch creates separate DB connection. But I have 2 collections to watch only so why it keeps creating collection on change.There is only listener in Sse api that in on('change')

Below is the code I have tried with

controller.ts


  @Sse('events')
  function(@Req() req): Observable<any> {
    
    const subject$ = new Subject();
    this.sseService.commonEventEmitter(req, subject$);
    return subject$.pipe(
      map((data: any) => ({ data, type: data.fetchTasks.event }))
    );
  }

sse.service.ts


@Injectable()
export class ServerSentEventsService implements OnApplicationShutdown {
  constructor(
    //injected model and service here
  ) {}

  collectionWatcher=this.collectionName.watch([],{fullDocument:'updateLookup'})

  commonEventEmitter(req, subject$): void {
   
    this.collectionWatcher.on('change', (next: any) => {
     //Operations I have to do on change
   }
  }

  
  onApplicationShutdown(signal: string) {
    console.log(`Application closes`);
    this.collectionWatcher.close();
  }

  
}
1

There are 1 answers

2
hemantsingh21 On

First, create a singleton service that initializes and manages the MongoDB change stream for your collections. This service should be responsible for creating a single global change stream instance that all users can use to listen for changes

// mongo-change-stream.service.ts
import { Injectable } from '@nestjs/common';
import { MongoClient } from 'mongodb';

@Injectable()
export class MongoChangeStreamService {
  private client: MongoClient;
  private changeStream: any;

  constructor() {
    // Initialize MongoDB connection and change stream here
    this.client = new MongoClient('mongodb://your-mongodb-uri', {
      useNewUrlParser: true,
      useUnifiedTopology: true,
    });
    this.client.connect().then(() => {
      const db = this.client.db('your-db-name');
      this.changeStream = db.collection('your-collection-name').watch([], {
        fullDocument: 'updateLookup',
      });

      this.changeStream.on('change', (next: any) => {
        // Emit the change to all connected SSE clients
        // You can use an EventEmitter or a Subject to notify SSE clients
        // Example:
        // this.sseEmitter.emit('change', next);
      });
    });
  }

  getChangeStream() {
    return this.changeStream;
  }
}

Inject the Singleton Service in Your Controller: Now, inject the MongoChangeStreamService into your SSE controller and use the single change stream instance it provides for all connected SSE clients.

// sse.controller.ts
import { Sse } from '@nestjs/common';
import { Controller, Get, Req } from '@nestjs/common';
import { Request } from 'express';
import { Subject } from 'rxjs';

import { MongoChangeStreamService } from './mongo-change-stream.service';

@Controller()
export class SSEController {
  constructor(private readonly mongoChangeStreamService: MongoChangeStreamService) {}

  @Get('events')
  @Sse('events')
  function(@Req() req: Request): Subject<any> {
    const subject$ = new Subject();
    // Register the SSE client to listen to the global change stream
    const changeStream = this.mongoChangeStreamService.getChangeStream();
    changeStream.on('change', (next: any) => {
      // Send the change to the connected SSE client
      subject$.next({ data: next, type: next.fetchTasks.event });
    });
    return subject$;
  }
}

By following this approach, you ensure that you have a single global change stream for your collections, and all connected SSE clients can listen to changes from this stream without creating multiple database connections. This should help reduce the number of connections to your MongoDB server when multiple users are connected to your SSE API.