Using Redis with uWebSockets C++ server

856 views Asked by At

I currently have a C++ web sockets server using uWebSockets. I want to scale it horizontally using Redis. It means that I'll use this Redis client. However, I'm facing a problem with the implementation of pub/sub channels. Indeed, since the Redis channel subscription needs its own event loop (according to this example), and obviously the same for the uWebSockets app (see this example), I end up with two event loops. And my problem is that I don't know how to manage running these two loops properly.

I tried running them on two different threads, which works if they are totally independent of each other. However, since I want to broadcast the upcoming Redis message to all web sockets client, I need the uWebSockets app instance (see this example) in the Redis thread to broadcast it:

Subscriber sub = redis->subscriber();

sub.on_message([](std::string channel, std::string msg){
    app->publish("broadcast", msg, (uWS::OpCode)1);
});

Therefore the two event loops are not independant of each other and when I received a message from Redis, it takes about 5 seconds before it is handled by the uWebSockets app.

Does someone know how to properly set up this Redis pus/sub feature ? Thank you for your help.

1

There are 1 answers

0
Bastien Faivre On

I managed to solve my problem.

I found that calling app->publish(...) in my second thread was not thread-safe. Indeed, an interesting post showed me that in order to access the app from another thread, we have to use the method defer on the event loop. Therefore, the structure becomes:

...

uWS::SSLApp *app = nullptr;
uWS::Loop *loop = nullptr;

Redis *redis = nullptr;

...

void redisEventLoopThread(Subscriber *sub) {

    sub->on_message([](string channel, string msg) {
        loop->defer([msg]() {
            app->publish(channel, msg, ...);
        });
    });

    sub->subscribe("channel_name");

    while (true) {
        try {
            sub->consume();
        } catch (const Error &err) {...}
    }
}

...

int main() {
    app = new uWS::SSLApp();
    loop = uWS::Loop::get();

    redis = new Redis(...);
    Subscriber sub = redis->subscriber();

    thread redisThread(redisEventLoopThread, &sub);

    app->ws<...>(...).listen(...).run();

    ...
}