How to use ray parallelism within a class in python?

5.8k views Asked by At

I want to use the ray task method rather than the ray actor method to parallelise a method within a class. The reason being the latter seems to need to change how a class is instantiated (as shown here). A toy code example is below, as well as the error

import numpy as np
import ray


class MyClass(object):
    
    def __init__(self):
        ray.init(num_cpus=4)
    
    @ray.remote
    def func(self, x, y):
        return x * y
    
    def my_func(self):
        a = [1, 2, 3]
        b = np.random.normal(0, 1, 10000)
        result = []
        # we wish to parallelise over the array `a`
        for sub_array in np.array_split(a, 3):
            result.append(self.func.remote(sub_array, b))
        return result

mc = MyClass()
mc.my_func()
>>> TypeError: missing a required argument: 'y'

The error arises because ray does not seem to be "aware" of the class, and so it expects an argument self.

The code works fine if we do not use classes:

@ray.remote
def func(x, y):
    return x * y

def my_func():
    a = [1, 2, 3, 4]
    b = np.random.normal(0, 1, 10000)
    result = []
    # we wish to parallelise over the list `a`
    # split `a` and send each chunk to a different processor
    for sub_array in np.array_split(a, 4):
        result.append(func.remote(sub_array, b))
    return result


res = my_func()
ray.get(res)
>>> [array([-0.41929678, -0.83227786, -2.69814232, ..., -0.67379119,
        -0.79057845, -0.06862196]),
 array([-0.83859356, -1.66455572, -5.39628463, ..., -1.34758239,
        -1.5811569 , -0.13724391]),
 array([-1.25789034, -2.49683358, -8.09442695, ..., -2.02137358,
        -2.37173535, -0.20586587]),
 array([ -1.67718712,  -3.32911144, -10.79256927, ...,  -2.69516478,
         -3.1623138 ,  -0.27448782])]```

We see the output is a list of 4 arrays, as expected. How can I get MyClass to work with parallelism using ray?

2

There are 2 answers

1
Alex On

a few tips:

  1. It's generally recommended that you only use the ray.remote decorator on functions or classes in python (not bound methods).

  2. You should be very very careful about calling ray.init inside the constructor of a function, since ray.init is not idempotent (which means your program will fail if you instantiate multiple instances of MyClass). Instead, you should make sure ray.init is only run once in your program.

I think there's 2 ways of achieving the results you're going for with Ray here.

You could move func out of the class, so it becomes a function instead of a bound method. Note that in this approach MyClass will be serialized, which means that changes that func makes to MyClass will not be reflected anywhere outside the function. In your simplified example, this doesn't appear to be a problem. This approach makes it easiest to achieve the most parallelism.

@ray.remote
def func(obj, x, y):
    return x * y


class MyClass(object):
    def my_func(self):
        ...
        # we wish to parallelise over the array `a`
        for sub_array in np.array_split(a, 3):
            result.append(func.remote(self, sub_array, b))
        return result

The other approach you could consider is to use async actors. In this approach, the ray actor will handle concurrency via asyncio, but this comes with the limitations of asyncio.

@ray.remote(num_cpus=4)
class MyClass(object):
    async def func(self, x, y):
        return x * y
    
    def my_func(self):
        a = [1, 2, 3]
        b = np.random.normal(0, 1, 10000)
        result = []
        # we wish to parallelise over the array `a`
        for sub_array in np.array_split(a, 3):
            result.append(self.func.remote(sub_array, b))
        return result
1
user18722159 On

Please see this code:

@ray.remote
class Prime:
    # Constructor
    def __init__(self,number) :
       self.num = number

    def SumPrime(self,num) :
       tot = 0
       for i in range(3,num):
           c = 0
           for j in range(2, int(i**0.5)+1):
                if i%j == 0:
                    c = c + 1
           if c == 0:
              tot = tot + i
      return tot
            
num = 1000000    
start = time.time()
# make an object of Check class
prime = Prime.remote(num)


print("duration =", time.time() - start, "\nsum_prime = ", ray.get(prime.SumPrime.remote(num)))