Parallelising file processing using rayon

1.7k views Asked by At

I've got 7 CSV files (55 MB each) in my local source folder which I want to convert into JSON format and store into local folder. My OS is MacOS (Quad-Core Intel i5). Basically, it is a simple Rust program which is run from a console as

./target/release/convert <source-folder> <target-folder>

My multithreading approach using Rust threads is ass following

fn main() -> Result<()> {
    let source_dir = PathBuf::from(get_first_arg()?);
    let target_dir = PathBuf::from(get_second_arg()?);

    let paths = get_file_paths(&source_dir)?;

    let mut handles = vec![];
    for source_path in paths {
        let target_path = create_target_file_path(&source_path, &target_dir)?;

        handles.push(thread::spawn(move || {
            let _ = convert(&source_path, &target_path);
        }));
    }

    for h in handles {
        let _ = h.join();
    }

    Ok(())
}

I run it using time to measure CPU utilisation which gives

2.93s user 0.55s system 316% cpu 1.098 total

Then I try to implement the same task using the rayon (threadpool) crate:

fn main() -> Result<()> {
    let source_dir = PathBuf::from(get_first_arg()?);
    let target_dir = PathBuf::from(get_second_arg()?);

    let paths = get_file_paths(&source_dir)?;
    let pool = rayon::ThreadPoolBuilder::new().num_threads(15).build()?;

    for source_path in paths {
        let target_path = create_target_file_path(&source_path, &target_dir)?;

        pool.install(|| {
            let _ = convert(&source_path, &target_path);
        });
    }

    Ok(())
}

I run it using time to measure CPU utilisation which gives

2.97s user 0.53s system 98% cpu 3.561 total

I can't see any improvements when I use rayon. I probably use rayon the wrong way. Does anyone have an idea what is wrong with it?

Update (09 Apr)

After some time of fight with the Rust checker , just want to share a solution, maybe it could help others, or anyone else could suggest a better approach/solution

pool.scope(move |s| {
        for source_path in paths {
            let target_path = create_target_file_path(&source_path, &target_dir).unwrap();
            s.spawn(move |_s| {
                convert(&source_path, &target_path).unwrap();
            });
        }
    });

But still does not beat the approach using rust std::thread for 113 files.

46.72s user 8.30s system 367% cpu 14.955 total

Update (10 Apr)

After @maxy comment

// rayon solution
paths.into_par_iter().for_each(|source_path| {
        let target_path = create_target_file_path(&source_path, &target_dir);

        match target_path {
            Ok(target_path) => {
                info!(
                    "Processing {}",
                    target_path.to_str().unwrap_or("Unable to convert")
                );
                let res = convert(&source_path, &target_path);
                if let Err(e) = res {
                    error!("{}", e);
                }
            }
            Err(e) => error!("{}", e),
        }
    });
    // std::thread solution
    let mut handles = vec![];
    for source_path in paths {
        let target_path = create_target_file_path(&source_path, &target_dir)?;
        handles.push(thread::spawn(move || {
            let _ = convert(&source_path, &target_path);
        }));
    }

    for handle in handles {
        let _ = handle.join();
    }

Comparison on 57 files:

std::threads: 23.71s user 4.19s system 356% cpu 7.835 total
rayon:        23.36s user 4.08s system 324% cpu 8.464 total
1

There are 1 answers

4
maxy On BEST ANSWER

The docu for rayon install is not super clear, but the signature:

pub fn install<OP, R>(&self, op: OP) -> R where
    R: Send,
    OP: FnOnce() -> R + Send, 

says it returns type R. The same type R that your closure returns. So obviously install() has to wait for the result.

This only makes sense if the closure spawns additional tasks, for example by using .par_iter() inside the closure. I suggest to use rayon's parallel iterators directly (instead of your for loop) over the list of files. You don't even need to create your own thread pool, the default pool is usually fine.

If you insist on doing it manually, you'll have to use spawn() instead of install. And you'll probably have to move your loop into a lambda passed to scope().