Can we use server sent events in nestjs without using interval?

12.1k views Asked by At

I'm creating few microservices using nestjs.

For instance I have x, y & z services all interconnected by grpc but I want service x to send updates to a webapp on a particular entity change so I have considered server-sent-events [open to any other better solution].

Following the nestjs documentation, they have a function running at n interval for sse route, seems to be resource exhaustive. Is there a way to actually sent events when there's a update.

Lets say I have another api call in the same service that is triggered by a button click on another webapp, how do I trigger the event to fire only when the button is clicked and not continuously keep sending events. Also if you know any idiomatic way to achieve this which getting hacky would be appreciated, want it to be last resort.

[BONUS Question]

I also considered MQTT to send events. But I get a feeling that it isn't possible for a single service to have MQTT and gRPC. I'm skeptical of using MQTT because of its latency and how it will affect internal message passing. If I could limit to external clients it would be great (i.e, x service to use gRPC for internal connections and MQTT for webapp just need one route to be exposed by mqtt). (PS I'm new to microservices so please be comprehensive about your solutions :p)

Thanks in advance for reading till end!

4

There are 4 answers

6
NingaCodingTRV On BEST ANSWER

There is a better way to handle events with SSE of NestJS:

Please see this repo with code example:

https://github.com/ningacoding/nest-sse-bug/tree/main/src

Where basically you have a service:

import {Injectable} from '@nestjs/common';
import {fromEvent} from "rxjs";
import {EventEmitter} from "events";

@Injectable()
export class EventsService {

    private readonly emitter = new EventEmitter();

    subscribe(channel: string) {
        return fromEvent(this.emitter, channel);
    }

    emit(channel: string, data?: object) {
        this.emitter.emit(channel, {data});
    }

}

Obviously, channel can be any string, as recommendation use path style.

For example: "events/for/<user_id>" and users subscribed to that channel will receive only the events for that channel and only when are fired ;) -

Fully compatible with @UseGuards, etc. :)

Additional note: Don't inject any service inside EventsService, because of a known bug.

12
Jay McDoniel On

You can. The important thing is that in NestJS SSE is implemented with Observables, so as long as you have an observable you can add to, you can use it to send back SSE events. The easiest way to work with this is with Subjects. I used to have an example of this somewhere, but generally, it would look something like this

@Controller()
export class SseController {
  constructor(private readonly sseService: SseService) {}

  @SSE()
  doTheSse() {
    return this.sseService.sendEvents();
  }
}
@Injectable()
export class SseService {
  private events = new Subject();

  addEvent(event) {
    this.events.next(event);
  }

  sendEvents() {
    return this.events.asObservable();
  }
}
@Injectable()
export class ButtonTriggeredService {
  constructor(private readonly sseService: SseService) {}

  buttonClickedOrSomething() {
    this.sseService.addEvent(buttonClickedEvent);
  }
}

Pardon the pseudo-code nature of the above, but in general it does show how you can use Subjects to create observables for SSE events. So long as the @SSE() endpoint returns an observable with the proper shape, you're golden.

1
Ashish Gupta On
  @Sse('sse-endpoint')
  sse(): Observable<any> {
    //data have to strem
    const arr = ['d1','d2', 'd3']; 
    return new Observable((subscriber) => {
        while(arr.len){
            subscriber.next(arr.pop()); // data have to return in every chunk
        }
        if(arr.len == 0) subscriber.complete(); // complete the subscription
    });
  }
0
Bennison J On

Yes, this is possible, instead of using interval, we can use event emitter. Whenever the event is emitted, we can send back the response to the client.

notification.controller.ts

import { Public } from 'src/decorators';
import { Observable } from 'rxjs';
import { FastifyReply } from 'fastify';
import { NotificationService } from './notification.service';

import { Sse, Controller, Res } from '@nestjs/common';

@Public()
@Controller()
export class NotificationController {
  constructor(private notificationService: NotificationService) {}
  @Sse('notifications')
  async sendNotification(@Res() reply: FastifyReply): Promise<Observable<any>> {
    return await this.notificationService.handleConnection();
  }
}

notification.service.ts

import { Injectable } from '@nestjs/common';
import { Subject } from 'rxjs';

@Injectable()
export class NotificationService {
  notificationEvent: Subject<any> = new Subject();
  async handleConnection() {
    setInterval(() => {
      this.notificationEvent.next({ data: { message: 'Hello World' } });
    }, 1000);
    return this.notificationEvent.asObservable();
  }
}
  • here I used the setInterval, just to emit a new event. You can emit the new event in your code wherever you want without the setInterval method.
  • I found some problem with this approach, and I shared the link below, check the below link as well. sse in nestJs