Excluding consumer of a topic by sender in Pulsar

135 views Asked by At

Intro

We're developing a system to support multiple real-time messages (chat) and updates (activity notifications).

That is, user A can receive via Web Socket messages for :

  1. receiving new chat messages
  2. receiving updates for some activity, for example if someone like their photo.
  3. and more.

We use one single WebSocket connection to send all these different messages to the client.

However, we also support multiple applications/clients to be open by the user at the same time. (i.e - user A connect on their web browser, and also from their mobile app, at the same time).

Architecture

We have a "Hub" that stores a map of UserId to a list of active websocket sessions.
(user:123 -> listOf(session#1, session#2))

Each client, once websocket connection is established, has its own Consumer which subscribes to a pulsar topic "userId" (e.g - user:123 topic).
If user A connected on both mobile and web, each client has its own Consumer to topic user:A.

When user A sends a new message from session #1 to user B, the flow is :

  1. user makes a REST POST request to send a message.
  2. service stores a new message to DB.
  3. service sends a Pulsar message to topic user:B and user:A.
  4. return 200 status code + created Message response.

Problem

If user A has two sessions open (two clients/websockets), and they send a message from session #1, how can we make sure only session #2 gets the message ?

Since user A has already received the 200 response with the created message in session #1, there's no need to send the message to him again by sending a message to his Consumer.

I'm not sure if it's a Pulsar configuration, or perhaps our architecture is wrong.

1

There are 1 answers

0
J_H On

how can we make sure only session #2 gets the message ?

I'm going to address this at the app level.


Prepend a unique nonce (e.g. a guid) to each message sent. Maintain a short list of recently sent nonces, aging them out so we never have more than, say, half a dozen.

Upon receiving a message, check to see if we sent it. That is, check to see if its nonce is in the list. If so, silently discard it.


Equivalently, name each connection.

You could roll a guid just once when a new websocket is opened. Or you could incorporate some of the websocket's addressing bits into the name.

Prepend the connection name to each outbound message. Discard any received message which has "sender" of "self".


With this de-dup'ing approach there's still some wasted network bandwidth. We can quibble about it if you wish. When the K-th websocket is created, we could create K topics, each excluding a different endpoint. Sounds like more work than it's worth!