How can I thread using functions using method in self?

453 views Asked by At

When trying to write some paralel code I keep getting hit with the error seen bellow. There are a few solutions I have found but they all involve locking which I would prefer not to do. Is there any way I can get round this?

pub trait PermBrute {
    fn quadgram( &self, max_len: usize, ciphertext: &String ) -> Vec<usize> {
        let mut vec : Vec<(f64, Vec<usize>)> = Vec::new();
        let results = Arc::new(Mutex::new(vec));
        let mut threads = vec![];

        for i in 0..*CPUS {
            threads.push( thread::spawn({
                let clone = Arc::clone(&results);
                let text = ciphertext.clone();
                move || {
                    // some code here
                    let hold = self.decrypt( )
                    // some more code here

                    let mut v = clone.lock().unwrap();
                    v.push(best_key);
                }
            }));
        }

        for t in threads {
            t.join().unwrap();
        }

        let lock = Arc::try_unwrap(results).expect("Lock still has multiple owners");
        let mut hold = lock.into_inner().expect("Mutex cannot be locked");

        // do some stuff with hold and return out

        return out;
    }

    fn decrypt( &self, ciphertext : &String, key : &Vec<usize>) -> String;
}
error[E0277]: `Self` cannot be shared between threads safely
   --> src/ciphers/cipher.rs:131:27
    |
108 |     fn quadgram( &self, max_len: usize, ciphertext: &String ) -> Vec<usize> {
    |                                                                            - help: consider further restricting `Self`: `where Self: std::marker::Sync`
...
131 |             threads.push( thread::spawn({
    |                           ^^^^^^^^^^^^^ `Self` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `Self`
    = note: required because of the requirements on the impl of `std::marker::Send` for `&Self`
    = note: required because it appears within the type `[closure@src/ciphers/cipher.rs:140:17: 164:18 text:std::string::String, start_len:usize, end_len:usize, count:usize, start_points_clone:std::vec::Vec<usize>, local_self:&Self, end:usize, clone:std::sync::Arc<std::sync::Mutex<std::vec::Vec<(f64, std::vec::Vec<usize>)>>>]`
2

There are 2 answers

0
Alice Ryhl On BEST ANSWER

Using the rayon crate, this can be done using the parallel iterator technique.

pub trait PermBrute {
    fn quadgram(&self, max_len: usize, ciphertext: &String) -> Vec<usize> {
        let mut vec: Vec<(f64, Vec<usize>)> = Vec::new();
        let mut threads = vec![];
        
        let best_keys: Vec<_> = (0..*CPUS)
            .into_par_iter()
            .map(|i| {
                // some code here
                // you can access `ciphertext` here directly without copying
                todo!();
                // some more code here
                
                best_key
            })
            .collect();

        // do some stuff with best_keys and return out

        return out;
    }
    fn decrypt(&self, ciphertext: &String, key: &Vec<usize>) -> String;
}
2
test failed in 1.08s On

It took me sometime to modify your code so that I can test it on rust playground. Here's the modified source code:

use std::sync::{Arc, Mutex};
use std::thread;

pub trait PermBrute {
    fn quadgram( &self, max_len: usize, ciphertext: &String ) -> Vec<usize> {
        let mut vec : Vec<(f64, Vec<usize>)> = Vec::new();
        let results = Arc::new(Mutex::new(vec));
        let mut threads = vec![];

        for i in 0..10 {
            threads.push( thread::spawn({
                let clone = Arc::clone(&results);
                let text = ciphertext.clone();
                move || {
                    // some code here
                    let hold = self.decrypt( &String::new(), &vec![] );
                    // some more code here

                    let mut v = clone.lock().unwrap();
                    // v.push(best_key);
                }
            }));
        }

        for t in threads {
            t.join().unwrap();
        }

        let lock = Arc::try_unwrap(results).expect("Lock still has multiple owners");
        let mut hold = lock.into_inner().expect("Mutex cannot be locked");

        // do some stuff with hold and return out

        // return out;
        unimplemented!()
    }

    fn decrypt(&self, ciphertext: &String, key: &Vec<usize>) -> String;
}

First, you may restrict Self by:

pub trait PermBrute: Sync {}

Then, rustc starts to bother about lifetimes:

(the error is too long, and I'm then using playground)

And this post should answer your question. In short, threads are spawned background, and rustc is still stupid and does not regard your joins. There are workarounds like Arc<Self> or AtomicPtr<Self>.


Update

Let us start with a minimal example:

use std::thread;

fn try_to_spawn() {
    let x: String = "5".to_string();
    let j = thread::spawn(|| {
        println!("{}", x.len());
    });
    j.join().unwrap();
}

Here, rustc says:

error[E0373]: closure may outlive the current function, but it borrows `x`, which is owned by the current function
 --> src/lib.rs:5:27
  |
5 |     let j = thread::spawn(|| {
  |                           ^^ may outlive borrowed value `x`
6 |         println!("{}", x.len());
  |                        - `x` is borrowed here
  |

help: to force the closure to take ownership of `x` (and any other referenced variables), use the `move` keyword
  |
5 |     let j = thread::spawn(move || {
  |                           ^^^^

Here rustc complains about lifetime of borrowed x. rustc thinks that: since a thread is spawned and will be running background, it could terminate either before or after function try_to_spawn exits, so x may be dangling when x.len() gets executed.

But obviously, we joined the thread at the end of function, and our x definitely lives long enough (of course, 'static life time is not required, at human's point of view). However, rustc is still too stupid to understand human, and it does not know something about our join!.

It is OK to move x into closure, instead of borrowing it. It will be impossible to use x in further time, however. To solve the problem in "safe" way, you use Arc<String>:

use std::thread;
use std::sync::Arc;

fn try_to_spawn() {
    let x: Arc<String> = Arc::new("5".to_string());
    let x_clone: Arc<String> = x.clone();
    let j = thread::spawn(move || {
        println!("{}", x_clone.len());
    });
    j.join().unwrap();
    println!("{}", x.len());
}

But Arc has overheads. One may want to use pointers *const String or *mut String in order to avoid lifetime checks -- raw pointers are not Send/Sync and cannot be transferred to a thread, however. To share resource via pointer between threads, you must use AtomicPtr (here is a discussion about making raw pointers to Send + Sync).


So back to the question, what about your self (of type &Self)? Of course, it is also a reference! And rustc also fails to figure out its "true lifetime":

use std::thread;
use std::sync::Arc;

struct S;

impl S {
    fn try_to_spawn(&self) {
        let j = thread::spawn(|| {
            self.do_something();
        });
        j.join().unwrap();
    }
    
    fn do_something(&self) {}
}

yields the error information:

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> src/lib.rs:8:31
   |
8  |           let j = thread::spawn(|| {
   |  _______________________________^
9  | |             self.do_something();
10 | |         });
   | |_________^
   |

This one does not look like the previous lifetime error, but more similar to the one occurred in your code. To resolve this issue, again, you may use Arc<Self>:

fn try_to_spawn(self: Arc<Self>) {
    let j = thread::spawn(move || {
        self.do_something();
    });
    j.join().unwrap();
}

or use AtomicPtr<Self>:

use std::thread;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::Relaxed;

struct S;

impl S {
    fn try_to_spawn(&self) {
        let self_ptr: AtomicPtr<Self> 
            = AtomicPtr::new(self as *const Self as *mut Self);
        let j = thread::spawn(move || {
            unsafe {
                self_ptr.load(Relaxed) // *mut Self
                    .as_ref()          // Option<&Self>
                    .unwrap()          // &Self
                    .do_something();
            }
        });
        j.join().unwrap();
    }
    
    fn do_something(&self) {}
}

This works but ugly. And I also suggest using crates like rayon to perform parallel computations. However, I still hope this answer is helpful, for circumstances you'd like to create threads manually.