Parallelizing with joblib - Performance saturation and general considerations

1.1k views Asked by At

I am using joblib in an effort gain some efficiency on the simple task of constructing probability densities for discrete data. In short, I am puzzled by the fact that my performance improvement saturates with 2 parallel processes, and nothing is gained by having more. I am also curious about other possible approaches to optimizing this program. I'll first run through the specifics of the problem in a bit of detail.

I consider a binary array X of shape (n_samples, n_features) and a vector y of classification labels. For the purpose of the experiment, this will do:

import numpy as np
X = np.random.randint(0,2,size=[n_samples,n_features])
y = np.random.randint(0,10,size=[n_samples,])

The function joint_probability_binary takes as input a column of the feature array X (an individual feature) and the label vector y and outputs their joint distribution. Nothing fancy.

def joint_probability_binary(x, y):

    labels    = list(set(y))
    joint = np.zeros([len(labels), 2])

    for i in xrange(y.shape[0]):
        joint[y[i], x[i]] += 1

    return joint / float(y.shape[0])

Now, I would like to apply joint_probability_binary to every feature (every column) of X. My understanding is that this task (given a large enough value of n_samples) would be coarse grained enough for multi-processing parallelism. I wrote a sequential and a parallel function to perform this task.

from joblib import Parallel, delayed

def joints_sequential(X, y):
    return [joint_probability_binary(X[:,i],y) for i in range(X.shape[1])]

def joints_parallel(X, y, n_jobs):
    return Parallel(n_jobs=n_jobs, verbose=0)(
        delayed(joint_probability_binary)(X = X[:,i],y = y) 
        for i in range(X.shape[1]))

I adapted the timing function written by Guido van Rossum himself, as presented here, as follows:

import time

def timing(f, n, **kwargs):
    r = range(n)
    t1 = time.clock()
    for i in r:
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
    t2 = time.clock()
    return round(t2 - t1, 3)

Lastly, to study the change in performance and its dependence on the number of jobs, I run

tseq = timing(joints_sequential,10, X=X,y=y)
print('Sequential list comprehension - Finished in %s sec' %tseq)

for nj in range(1,9):
    tpar = timing(joints_parallel,10, X=X, y=y, n_jobs=nj)
    print('Parallel execution - %s jobs - Finished in %s sec' %(nj,tpar))

for n_samples = 20000 and n_features = 20, I get

Sequential list comprehension - Finished in 60.778 sec
Parallel execution - 1 jobs - Finished in 61.975 sec
Parallel execution - 2 jobs - Finished in 6.446 sec
Parallel execution - 3 jobs - Finished in 7.516 sec
Parallel execution - 4 jobs - Finished in 8.275 sec
Parallel execution - 5 jobs - Finished in 8.953 sec
Parallel execution - 6 jobs - Finished in 9.962 sec
Parallel execution - 7 jobs - Finished in 10.382 sec
Parallel execution - 8 jobs - Finished in 11.321 sec

1.

This result confirms that there's quite a bit to be gained from parallelizing this task (running this on OS X with 2 GHz Intel Core i7 with 4 cores). The thing that I find most striking, however, is that performance saturates already for n_jobs = 2. Given the size of each task, I find it difficult to think that this could be caused by Joblib overhead alone, but then again my intuition is limited. I repeated the experiment with larger arrays, n_samples = 200000 and n_features = 40, and this leads to the same behavior: Sequential list comprehension - Finished in 1230.172 sec

Parallel execution - 1 jobs - Finished in 1198.981 sec
Parallel execution - 2 jobs - Finished in 94.624 sec
Parallel execution - 3 jobs - Finished in 95.1 sec
...

Does anybody have an intuition about why this might be the case (given that my overall approach is reasonable enough)?

2.

Lastly, in terms of overall optimization, what would be other ways to improve the performance of a program of this kind? I am suspecting there would be a lot to be gained from writing a Cython implementation of the function that computes the joint probability, but I have no experience with it.

2

There are 2 answers

0
K. Steimel On

My experience is that this is usually because you're oversubscribing cores. On my desktop with an i7-3770 I'm getting the following:

Sequential list comprehension - Finished in 25.734 sec
Parallel execution - 1 jobs - Finished in 25.532 sec
Parallel execution - 2 jobs - Finished in 4.302 sec
Parallel execution - 3 jobs - Finished in 4.178 sec
Parallel execution - 4 jobs - Finished in 4.521 sec

Without knowing more about your system, I'm not able to help much. However, often laptop processors will have more logical cores than physical cores due to hyperthreading or other technologies. This is not a task that does well with hyperthreading though. (e.g. you won't see any increase in performance by using the additional threads since nothing is blocked by IO here so there's not as much of a chance for ).

You could also have a cpu that automatically increases its clock rate when one or two cores are heavily utilized but drops down when all are heavily utilized. This could give you extra performance for two cores.

To get more performance I'd recommend writing your joint_probability_binary() function as a numpy ufunc using their from pyfunc() function to generate a c version. https://docs.scipy.org/doc/numpy/reference/ufuncs.html .

Numba could also help but I've never used it http://numba.pydata.org/numba-doc/0.35.0/index.html .

0
kostyfisik On

As soon as this SO page was the first one for my google request "joblib performance" I've done some investigation.

Does anybody have an intuition about why this might be the case (given that my overall approach is reasonable enough)?

To my opinion the problem is bounded by the memory. The situation is puzzled by an unclear measurement. I run the original code and measured run time externally via the time python3 joblib_test.py, in joblib_test.py I keep commented all but one evaluations. On my 4 core CPU I used n_samples = 2000000, n_features = 40, and reduced number of repetitions:

  1. Sequential list comprehension - Finished in 54.911 sec
    real 0m55.307s

  2. Parallel execution - 4 jobs - Finished in 2.515 sec
    real 0m53.519s

It clearly shows, that actual runtime is almost identical.

Lastly, in terms of overall optimization, what would be other ways to improve the performance of a program of this kind?

Using numba (so it is import numba, decorate the real worker by @numba.jit(nopython=True,cache=True), and minor modifications to the worker leads to the speedup by a factor of 7!

  1. Sequential list comprehension (mod) - Finished in 7.665 sec
    real 0m7.167s

  2. Parallel execution (mod) - 4 jobs - Finished in 2.004 sec
    real 0m9.143s

Once again, it nicely presents the fact of being memory bandwidth bound. For the optimized version there is some overhead of using 4 cores.

Full code example:

n_samples = 2000000
n_features = 40

print("n_samples = ", n_samples, "  n_features = ", n_features)

import numpy as np
# X = np.random.randint(0,2,size=[n_samples,n_features])
# y = np.random.randint(0,10,size=[n_samples,])

def joint_probability_binary(x, y):

    labels    = list(set(y))
    joint = np.zeros([len(labels), 2])

    for i in range(y.shape[0]):
        joint[y[i], x[i]] += 1

    return joint / float(y.shape[0])

import numba
@numba.jit(nopython=True,cache=True)
def joint_probability_binary_mod(x, y):
    labels    = np.unique(y)
    joint = np.zeros((labels.size, 2))

    for i in range(y.shape[0]):
        joint[y[i], x[i]] += 1

    return joint / float(y.shape[0])

from joblib import Parallel, delayed

def joints_sequential(the_job):
    X = np.random.randint(0,2,size=[n_samples,n_features])
    y = np.random.randint(0,10,size=[n_samples,])
    return [the_job(X[:,i],y) for i in range(X.shape[1])]


def joints_parallel(n_jobs, the_job,batch_size='auto'):
    X = np.random.randint(0,2,size=[n_samples,n_features])
    y = np.random.randint(0,10,size=[n_samples,])
    return Parallel(n_jobs=n_jobs, verbose=0,batch_size=batch_size)(
        delayed(the_job)(x = X[:,i],y = y) 
        for i in range(X.shape[1])
    )

import time

def timing(f, n, **kwargs):
    r = range(n)
    t1 = time.clock()
    for i in r:
        res = f(**kwargs);
    t2 = time.clock()
    #print(np.sum(res))
    return round(t2 - t1, 3)

ttime = 0

# tseq = timing(joints_sequential,1, the_job=joint_probability_binary_mod)
# print('Sequential list comprehension (mod) - Finished in %s sec' %tseq)
# ttime+=tseq

for nj in range(4,5):
    tpar = timing(joints_parallel,1,n_jobs=nj,
                  the_job=joint_probability_binary_mod,
                  batch_size = int(n_samples/nj))
    print('Parallel execution (mod) - %s jobs - Finished in %s sec' %(nj,tpar))
    ttime+=tpar

# tseq = timing(joints_sequential,1, the_job=joint_probability_binary)
# print('Sequential list comprehension - Finished in %s sec' %tseq)
# ttime+=tseq

# for nj in range(4,5):
#     tpar = timing(joints_parallel,1,n_jobs=nj, the_job=joint_probability_binary)
#     print('Parallel execution - %s jobs - Finished in %s sec' %(nj,tpar))
#     ttime+=tpar

print("total time measured by Python",ttime)