Reactive WebFlux Publish Notifications to specific Subscriber

1k views Asked by At

I want to build an user notification system. The ideia is that if the user is logged the system will subscribe the notification service and as the notifications are generated to that use it must return the number of new notifications.

I'm using Java Spring boot with Webflux to achive this. I was able to build a basic example where it works using an EmitterProcessor where every time I add a notification it sends the number of new notification to the subcribers, based on a user identifier.

I run the application and setup 1 subcriber lets say user A and when I add a notification user A receives the update, the problem starts when I add 2 more subcribers (B and C). As I create the notification for the user A, both user B and C receive the update.

So my question is, using webflux is there a way to send the notification update directly to the subcriber that represents the correct user?

Basics of my code below:

Flux Processor and FluxSink initialization

private final FluxProcessor processor;
private final FluxSink<Integer> sink;

public NotificationController() {

    this.processor = EmitterProcessor.create().serialize();
    this.sink = processor.sink();

}

Subscribe Inbox method

@GetMapping(value = "/inbox/{userId}")
public Flux<ServerSentEvent> subscribeInbox(@PathVariable String userId) {

    Flux<ServerSentEvent> serverSentEventFlux = this.processor.map(e -> ServerSentEvent.builder(e).build());

    List<Notification> notificationList = this.repositoryMap.get(userId);
    if (notificationList == null) {
        notificationList = new ArrayList<>();
    }

    this.sink.next(notificationList.size());

    return serverSentEventFlux;

}

External forcing publish notification method

@PostMapping(value = "/{userId}", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity publishNotifications(@PathVariable String userId) {

    List<Notification> notificationList = this.repositoryMap.get(userId);
    if (notificationList == null) {
        notificationList = new ArrayList<>();
    }

    this.sink.next(notificationList.size());
    return ResponseEntity.ok().build();

}

Thanks in Advance.

0

There are 0 answers