How to use python Ray to parallelise over a large list?

10k views Asked by At

I want to parallelise the operation of a function on each element of a list using ray. A simplified snippet is below

import numpy as np
import time

import ray
import psutil
num_cpus = psutil.cpu_count(logical=False)
ray.init(num_cpus=num_cpus)


@ray.remote
def f(a, b, c):
    return a * b - c


def g(a, b, c):
    return a * b - c


def my_func_par(large_list):
    # arguments a and b are constant just to illustrate
    # argument c is is each element of a list large_list
    [f.remote(1.5, 2, i) for i in large_list]


def my_func_seq(large_list):
    # arguments a anf b are constant just to illustrate
    # argument c is is each element of a list large_list
    [g(1.5, 2, i) for i in large_list]

my_list = np.arange(1, 10000)

s = time.time()
my_func_par(my_list)
print(time.time() - s)
>>> 2.007

s = time.time()
my_func_seq(my_list)
print(time.time() - s)
>>> 0.0372

The problem is, when I time my_func_par, it is much slower (~54x as can be seen above) than my_func_seq. One of the authors of ray does answer a comment on this blog that seems to explain what I am doing is setting up len(large_list) different tasks, which is incorrect.

How do I use ray and modify the code above to run it in parallel? (maybe by splitting large_list into chunks with the number of chunks being equal to the number of cpus)

EDIT: There are two important criteria in this question

  • The function f needs to accept multiple arguments
  • It may be necessarry to use ray.put(large_list) so that the larg_list variable can be stored in shared memory rather than copied to each processor
2

There are 2 answers

0
Sang On

The reason why the parallized version is slower is that running ray tasks unavoidably have overhead to run (although it puts lots of effort to optimize it). It is because running things in parallel requires to have inter-process communication, serialization, and things like that.

That being said, if your function is really fast (as fast as the running function takes less time than other overhead in distributed computation, in which your code is perfectly the case because the function f is really really tiny. I assume it will take less than a microsecond to run that function).

This means you should make f function more computationally heavier in order to get benefit from parallelization. Your proposed solution might not work because even after that, the function f might be still lightweight enough depending on your list size.

3
zhz On

To add to what Sang said above:

Ray Distributed multiprocessing.Pool supports a fixed-size pool of Ray Actors for easier parallelization.

import numpy as np
import time

import ray
from ray.util.multiprocessing import Pool
pool = Pool()

def f(x):
    # time.sleep(1)
    return 1.5 * 2 - x

def my_func_par(large_list):
    pool.map(f, large_list)

def my_func_seq(large_list):
    [f(i) for i in large_list]

my_list = np.arange(1, 10000)

s = time.time()
my_func_par(my_list)
print('Parallel time: ' + str(time.time() - s))

s = time.time()
my_func_seq(my_list)
print('Sequential time: ' + str(time.time() - s))

With the above code, my_func_par runs much faster (about 0.1 sec). If you play with the code and make f(x) slower by something like time.sleep, you can see the clear advantage of multiprocessing.