Why the performance of concurrent.futures.ProcessPoolExecutor is very low?

1.4k views Asked by At

I'm trying to leverage concurrent.futures.ProcessPoolExecutor in Python3 to process a large matrix in parallel. The general structure of the code is:

class X(object):

self.matrix

def f(self, i, row_i):
    <cpu-bound process>

def fetch_multiple(self, ids):
    with ProcessPoolExecutor() as executor:
        futures = [executor.submit(self.f, i, self.matrix.getrow(i)) for i in ids]
        return [f.result() for f in as_completed(futures)]

self.matrix is a large scipy csr_matrix. f is my concurrrent function that takes a row of self.matrix and apply a CPU-bound process on it. Finally, fetch_multiple is a function that run multiple instance of f in parallel and returns the results.

The problem is that after running the script, all cpu cores are less than 50% busy (See the following screenshot):

enter image description here

Why all cores are not busy?

I think the problem is the large object of self.matrix and passing row vectors between processes. How can I solve this problem?

1

There are 1 answers

0
jsbueno On BEST ANSWER

Yes. The overhead should not be that big - but it is likely the cause of your CPUs appearing iddle (although, they should be busy passing the data around anyway).

But try the recipe here to pass a "pointer" of the object to the subprocess using shared memory.

http://briansimulator.org/sharing-numpy-arrays-between-processes/

Quoting from there:

from multiprocessing import sharedctypes
size = S.size
shape = S.shape
S.shape = size
S_ctypes = sharedctypes.RawArray('d', S)
S = numpy.frombuffer(S_ctypes, dtype=numpy.float64, count=size)
S.shape = shape

Now we can send S_ctypes and shape to a child process in multiprocessing, and convert it back to a numpy array in the child process as follows:

from numpy import ctypeslib
S = ctypeslib.as_array(S_ctypes)
S.shape = shape

It should be tricky to take care of reference counting, but I suppose numpy.ctypeslib takes care of that - so, just coordinate the passing of the actual row number to sub-processes in a way they don't work on the same data