I'm intrigued on how less efficient are parallel loops in Python compared to parloop
from Matlab.
Here I am presenting a simple root-finding problem brute-forcing initial 10^6 initial guesses between a
and b
.
import numpy as np
from scipy.optimize import root
import matplotlib.pyplot as plt
import multiprocessing
# define the function to find the roots
func = lambda x: np.sin(3*np.pi*np.cos(np.pi*x)*np.sin(np.pi*x))
def forfunc(x0):
q = [root(func, xi).x for xi in x0]
q = np.array(q).T[0]
return q
# variables os the problem
a = -3
b = 5
n = int(1e6)
x0 = np.linspace(a,b,n) # list of initial guesses
# the single-process loop
q = forfunc(x0)
# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
The single-process loop takes 1min 26s of wall time and the parallel loop takes 1min 7s. I see some improvement as the speedup is 1.28, but the efficiency (timeloop/timeparallel/n_process)
is 0.32 in this case.
What is happening here and how to improve this efficiency? Am I doing something wrong?
I also tried using dask.delayed
in two ways:
import dask
# Every call is a delayed object
q = dask.compute(*[dask.delayed(func)(xi) for xi in x0])
# Every chunk is a delayed object
q = dask.compute(*[dask.delayed(forfunc)(x0i) for x0i in np.split(x0,nc)])
And here both takes more time than the single-process loop. The wall time for the first try is 3min and for the second try it took 1min 27s.
What's Happening with Dask (or Spark)
From your single-process tests, your loop executes one million tasks in 90 seconds. Thus, each task takes your CPU about 90 microseconds in the average case.
In distributed computing frameworks like Dask or Spark that provide flexibility and resiliency, tasks have a small overhead associated with them. Dask's overhead is as low as 200 microseconds per task. The Spark 3.0 documentation suggests that Spark can support tasks as short as 200 milliseconds, which perhaps means Dask actually has 1000x less overhead than Spark. It sounds like Dask is actually doing really well here!
If your tasks are faster than the per-task overhead of your framework, you'll simply see worse performance using it relative to manually distributing your work across the same number of machines/cores. In this case, you're running into that scenario.
In your chunked data Dask example you have only a few tasks, so you see better performance from reduced overhead. But, you are either likely taking a small performance hit from the overhead of Dask relative to raw multiprocessing, or you're not using a Dask cluster and running the tasks a single process.
Multiprocessing (and Dask) Should Help
Your results with multiprocessing are generally unexpected for this kind of embarrassingly parallel problem. You may want to confirm the number of physical cores on your machine and in particular make sure nothing else is actively utilizing your CPU cores. Without knowing anything else, I would guess that's the culprit.
On my laptop with two physical cores, your example takes:
nc=2
to split into two chunks and a LocalCluster of two workers and one thread per worker. It may be worth double checking you're running on a cluster.Getting a roughly 2x speedup with two processes is line with expectations on my laptop, as is seeing minimal or no benefit from more processes for this CPU bound task. Dask also adds a bit of overhead relative to raw multiprocessing.