Rust Based Websocket Server blocks after n connections

101 views Asked by At

first of all i'm fairly new to Rust. I'm implementing a WebSocket Server which maybe someday will run in our Kubernetes cluster (yes i'm aware of the statefull issue with websockets in such an environment) for our application.

At the moment this is a POC implementation. I've started building the Websocket Server based on axum and now with wrap. Both implementation have the same issue. The Websocket Server accepts connections, give them an unique uuid Session Id and sends back what ever message it receives.

When i'm running with locust (python) a performance test, already after a couple of connections the Websocket Server blocks new connections. This threshold is different on my Mac Laptop and on my Linux Laptop (threshold is higher on Linux).

There are no crashes on the Rust Websocket server or any other indication for a problem. It kind of seems/feels like a "natural" limit i'm hitting here? I'm pretty sure its something basic/fundamental i'm missing here.

Any hints are more than welcome, really looking forward to learn more about rust :)

This is the very simplyfied WebSocket Server code I've created to reproduce the error. In this case with Warp. (as mentioned, with Axum exactly the same issue)

use warp::{Filter, Rejection};

use tracing::{info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

type Result<T> = std::result::Result<T, Rejection>;


#[tokio::main]
async fn main() {
    // initialize tracing
    tracing_subscriber::registry()
        .with(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| "enarc=debug,tower_http=debug".into()),
        )
        .with(tracing_subscriber::fmt::layer())
        .init();
    info!("Logging has been initialized");

    // initialize tracing
    info!("Configuring websocket route");
    let ws_route = warp::path("ws")
        .and(warp::ws())
        .and_then(crate::handler::ws_handler);

    let routes = ws_route.with(warp::cors().allow_any_origin());

    info!("Starting server");
    warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
}

mod handler {
    use warp::{ Reply};
    use crate::Result;
    use warp::ws::{WebSocket, Message};
    use uuid::Uuid;
    use tracing::{info, warn, debug};

    use futures::{StreamExt};
    use futures_util::SinkExt;

    pub async fn ws_handler(ws: warp::ws::Ws) -> Result<impl Reply> {
        info!("New websocket connection");
        Ok(ws.on_upgrade(move |socket| client_connection(socket)))
    }

    pub async fn client_connection(ws: WebSocket) {
        let (mut client_ws_sender, mut client_ws_rcv) = ws.split();

        let uuid = Uuid::new_v4().simple().to_string();
        info!("{} new Session", uuid);

        while let Some(result) = client_ws_rcv.next().await {
            match result {
                Ok(msg) => {
                    if msg.is_text() {
                        match msg.to_str() {
                            Ok(s) => {
                                debug!("Received message from {}: {:?}", uuid.clone(), s);

                                match client_ws_sender.send(Message::text(format!("You said: \"{}\"", s))).await {
                                    Ok(_) => {},
                                    Err(e) => {
                                        warn!("error sending message for id {}): {}", uuid.clone(), e);
                                        break;
                                    }
                                };
                            },
                            Err(e) => {
                                warn!("error converting message for id {}): {:#?}", uuid.clone(), e);
                                break;
                            }
                        };
                    } else if msg.is_close() {
                        debug!("Received close message from {}", uuid.clone());
                        break;
                    }
                },
                Err(e) => {
                    info!("error receiving message for id {}): {}", uuid.clone(), e);
                    break;
                }
            };
        }
        info!("{} disconnected", uuid);
    }
}
0

There are 0 answers