How to avoid using a lock (Mutex) to read and send messages from a WebSocket connection with async Rust

67 views Asked by At

I reimplement CRDTs algorithms to do tests, in particular the PN-Counter. To simulate multiple clients, I use a client/server architecture where the server forwards sent messages to all other clients. Below is the client code, written in Rust using Tokio and Tungstenite. Each client creates a PN-Counter and modifies it randomly (increment, decrement...), then it sends its update to the other clients. When a client receives an update, it updates its own PN-Counter. How to avoid using an Arc<Mutex>, which seems to slow down execution, to share the PNCounter between 2 threads?

#[tokio::main]
async fn main() {
    // Get domain, port, route from .env...
    let url = format!("ws://{}:{}/{}", domain, port, route);
    let pn_counter = Arc::new(Mutex::new(crdt::PNCounter::new()));
    let pn_counter_clone = pn_counter.clone();

    let (ws_stream, _) = connect_async(url).await.unwrap();
    info!("Successfully connected to the WebSocket");

    let (mut write, mut read) = ws_stream.split();

    tokio::spawn(async move {
        while let Some(msg) = read.next().await {
            let msg = msg.unwrap().to_string();
            info!("Received: {}", msg);
            {
                let mut pn_counter_inner = pn_counter_clone.lock().unwrap();
                parse_string(&msg, &mut pn_counter_inner);
            }
        }
    });

    let mut rng = rand::thread_rng();
    let mut interval = tokio::time::interval(std::time::Duration::from_secs(2));
    loop {
        interval.tick().await;
        let incr: u32 = rng.gen_range(1..10);
        let decr: u32 = rng.gen_range(1..10);
        let msg: String;
        let choice: u32 = rng.gen_range(0..2);
        {
            let mut pn_counter_inner = pn_counter.lock().unwrap();
            if choice == 0 {
                pn_counter_inner.increment(incr);
                msg = format!("+{}", incr);
            } else {
                pn_counter_inner.decrement(decr);
                msg = format!("-{}", decr);
            }
            info!("Counter value: {}", pn_counter_inner.value());
        }
        write.send(Message::Text(msg)).await.unwrap();
    }
}

fn parse_string(input: &str, pn_counter: &mut crdt::PNCounter) {
    let mut chars = input.chars();
    let sign = chars.next().unwrap();
    let num: u32 = chars.as_str().parse().unwrap();
    if sign == '+' {
        pn_counter.increment(num);
    } else {
        pn_counter.decrement(num);
    }
    info!("Counter value: {}", pn_counter.value());
}
0

There are 0 answers