The issue I'm running into is lifetime management. I've an injector, which I've shared between worker threads. I had to use Arc<>
as &
was causing lifetime complaints. The local queue isn't shared, so no worries there. Then we come to the stealers.
From the docs, the steals can be shared across threads. So, 4 threads, each with a worker ends up with each thread having 3 stealers. However, again I run into lifetime issues that seem to point to move of self
when the thread is being spawned. The example task picker, indicates that the stealers can be represented as a slice of stealers, what I'm ending up with is a slice of references to stealers.
Maybe I've misunderstood the docs, and I should be using Arc
-style reference counting to manage the lifetimes of the injector and stealers. Can someone clarify this for me?
Here's a simplification (of sorts) with a pool of 3 threads. I've removed the lifetime management around the thread's stealer and injector as those could be either Arc
or <'a>
or something else.
// details on injector, worker and stealer can be found here:
// https://docs.rs/crossbeam/0.7.3/crossbeam/deque/index.html
struct ThreadData {
injector: &deque::Injector::<Task>, // common global queue
task_q: deque::Worker::<Task>, // local queue
stealers: Vec<&deque::Stealer<Task>> // stealers for other threads local queue
}
impl ThreadData {
fn spawn(self) -> Option<std::thread::JoinHandle<()>> {
let thread = std::thread::spawn(move|| {
find_task( &self.task_q, &self.injector, &self.stealers);
});
Some(thread)
}
}
struct Worker {
stealer: deque::Stealer<Task>, // to be shared with other threads
thread: Option<std::thread::JoinHandle<()>>
}
struct Factory {
injector: deque::Injector::<Task> , // owner of the global queue
workers: Vec<Worker>
}
impl Factory {
fn new() -> Self {
Self { injector: deque::Injector::<Task>::new(), workers: Vec::new() }
}
fn build_threadpool(mut self) {
let mut t1 = ThreadData {
injector: &self.injector,
task_q: deque::Worker::<Task>::new_fifo(),
stealers: Vec::new(),
};
let w1 = Worker {stealer: t1.task_q.stealer(), thread: None };
let t2 = ThreadData {
injector: &self.injector,
task_q: deque::Worker::<Task>::new_fifo(),
stealers: Vec::new(),
};
let w2 = Worker {stealer: t2.task_q.stealer(), thread: None};
let t3 = ThreadData {
injector: &self.injector,
task_q: deque::Worker::<Task>::new_fifo(),
stealers: Vec::new(),
};
let w3 = Worker {stealer: t3.task_q.stealer(), thread: None };
t1.stealers.push(&w2.stealer);
t1.stealers.push(&w3.stealer);
t2.stealers.push(&w1.stealer);
t2.stealers.push(&w3.stealer);
t3.stealers.push(&w1.stealer);
t3.stealers.push(&w2.stealer);
// launch threads and save workers
w1.thread = t1.spawn();
w2.thread = t2.spawn();
w3.thread = t3.spawn();
self.workers.push(w1);
self.workers.push(w2);
self.workers.push(w3);
}
}
https://docs.rs/crossbeam/latest/crossbeam/deque/struct.Injector.html
cargo nextest run --no-capture