I have the following code that is using a bounded channel with capacity (20) less than the total amount of data (32) that I want to send via a crossbeam channel. My goal is to use multiple sender threads (8) and a certain amount (4) of numbers each to a single receiver via a crossbeam channel and for all of this to happen in parallel to optimize efficiency. This is just a small prototype of a bigger problem that I'm trying to solve. However, the code that I have is causing the program to get stuck endlessly and never exit/timeout. I also know why it's happening - r.iter()
blocks until the sender is dropped which only happens outside of the scope. I tried various approaches that didn't work:
Cloning sender withing each thread and then dropping them (as you can see in the comments)
Having the receiver code outside the scope, but that was only letting the final vector contain 20 length instead of the desired 32.
fn main() {
use crossbeam_channel::{unbounded, bounded};
use crossbeam_utils::thread::scope;
use itertools::Itertools;
let (s, r) = bounded(20);
let mut v = vec![];
scope(|scope| {
scope.spawn(|_| {
for data in r.iter() {
v.push(data);
}
});
let _sender_threads = (0..8)
.into_iter()
.map(|_| {
scope.spawn(|_| {
// let s_clone = s.clone();
for i in 0..4 {
// std::thread::sleep(std::time::Duration::from_millis(100));
match s.send(i) {
Ok(_) => {
// println!("sent i {:?}", i);
()
},
Err(_)=> {
// println!("error sending i {:?}", i);
()
}
};
}
// drop(s_clone);
})
})
.collect_vec();
}).expect("scope error.");
drop(s);
println!("{:?}", v);
}
This is happening because
s
isn't dropped until the scope ends, but the scope won't end until all threads have exited, and the thread callingr.iter()
won't exit untils
is dropped. This is a classic deadlock scenario.You need to drop
s
inside the scope, but you can only do that once the sender threads have all exited, so you can'tdrop(s);
in the scope the way this is currently written.The simplest way around this is to clone
s
for each sender thread and move the clone into the thread's closure, then drops
in the main scope afterwards.Note the addition of
let s = s.clone();
and the change to the following closure by addingmove
so the closure takes ownership of the clone. Then we movedrop(s)
into the scope. Now once all sender threads have exited, the sending side of the channel is closed, and the receiver'sfor
loop will terminate.(Playground)