I reimplement CRDTs algorithms to do tests, in particular the PN-Counter. To simulate multiple clients, I use a client/server architecture where the server forwards sent messages to all other clients. Below is the client code, written in Rust using Tokio and Tungstenite. Each client creates a PN-Counter and modifies it randomly (increment, decrement...), then it sends its update to the other clients. When a client receives an update, it updates its own PN-Counter. How to avoid using an Arc<Mutex>, which seems to slow down execution, to share the PNCounter between 2 threads?
#[tokio::main]
async fn main() {
// Get domain, port, route from .env...
let url = format!("ws://{}:{}/{}", domain, port, route);
let pn_counter = Arc::new(Mutex::new(crdt::PNCounter::new()));
let pn_counter_clone = pn_counter.clone();
let (ws_stream, _) = connect_async(url).await.unwrap();
info!("Successfully connected to the WebSocket");
let (mut write, mut read) = ws_stream.split();
tokio::spawn(async move {
while let Some(msg) = read.next().await {
let msg = msg.unwrap().to_string();
info!("Received: {}", msg);
{
let mut pn_counter_inner = pn_counter_clone.lock().unwrap();
parse_string(&msg, &mut pn_counter_inner);
}
}
});
let mut rng = rand::thread_rng();
let mut interval = tokio::time::interval(std::time::Duration::from_secs(2));
loop {
interval.tick().await;
let incr: u32 = rng.gen_range(1..10);
let decr: u32 = rng.gen_range(1..10);
let msg: String;
let choice: u32 = rng.gen_range(0..2);
{
let mut pn_counter_inner = pn_counter.lock().unwrap();
if choice == 0 {
pn_counter_inner.increment(incr);
msg = format!("+{}", incr);
} else {
pn_counter_inner.decrement(decr);
msg = format!("-{}", decr);
}
info!("Counter value: {}", pn_counter_inner.value());
}
write.send(Message::Text(msg)).await.unwrap();
}
}
fn parse_string(input: &str, pn_counter: &mut crdt::PNCounter) {
let mut chars = input.chars();
let sign = chars.next().unwrap();
let num: u32 = chars.as_str().parse().unwrap();
if sign == '+' {
pn_counter.increment(num);
} else {
pn_counter.decrement(num);
}
info!("Counter value: {}", pn_counter.value());
}