In order to speed up a certain list processing logic, I wrote a decorator that would 1) intercept incoming function call 2) take its input list, break it into multiple pieces 4) pass these pieces to the original function on seperate threads 5) combine output and return
I thought it was a pretty neat idea, until I coded it and saw there was no change in speed! Even though I see multiple cores busy on htop, multithreaded version is actually slower than the single thread version.
Does this have to do with the infamous cpython GIL?
Thanks!
from threading import Thread
import numpy as np
import time
# breaks a list into n list of lists
def split(a, n):
k, m = len(a) / n, len(a) % n
return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in xrange(n))
THREAD_NUM = 8
def parallel_compute(fn):
class Worker(Thread):
def __init__(self, *args):
Thread.__init__(self)
self.result = None
self.args = args
def run(self):
self.result = fn(*self.args)
def new_compute(*args, **kwargs):
threads = [Worker(args[0], args[1], args[2], x) for x in split(args[3], THREAD_NUM)]
for x in threads: x.start()
for x in threads: x.join()
final_res = []
for x in threads: final_res.extend(x.result)
return final_res
return new_compute
# some function that does a lot of computation
def f(x): return np.abs(np.tan(np.cos(np.sqrt(x**2))))
class Foo:
@parallel_compute
def compute(self, bla, blah, input_list):
return map(f, input_list)
inp = [i for i in range(40*1000*100)]
#inp = [1,2,3,4,5,6,7]
if __name__ == "__main__":
o = Foo()
start = time.time()
res = o.compute(None, None, inp)
end = time.time()
print 'parallel', end - start
Single thread version
import time, fast_one, numpy as np
class SlowFoo:
def compute(self, bla, blah, input_list):
return map(fast_one.f, input_list)
if __name__ == "__main__":
o = SlowFoo()
start = time.time()
res = np.array(o.compute(None, None, fast_one.inp))
end = time.time()
print 'single', end - start
And here is the multiprocessing version that gives "PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed".
import pathos.multiprocessing as mp
import numpy as np, dill
import time
def split(a, n):
k, m = len(a) / n, len(a) % n
return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in xrange(n))
def f(x): return np.abs(np.tan(np.cos(np.sqrt(x**2))))
def compute(input_list):
return map(f, input_list)
D = 2; pool = mp.Pool(D)
def parallel_compute(fn):
def new_compute(*args, **kwargs):
inp = []
for x in split(args[0], D): inp.append(x)
outputs_async = pool.map_async(fn, inp)
outputs = outputs_async.get()
outputs = [y for x in outputs for y in x]
return outputs
return new_compute
compute = parallel_compute(compute)
inp = [i for i in range(40*1000)]
if __name__ == "__main__":
start = time.time()
res = compute(inp)
end = time.time()
print 'parallel', end - start
print len(res)
Yes, when your threads are doing CPU-bound work implemented in Python (not by, say, C extensions which can release the GIL before and after marshalling/demarshalling data from Python structures), the GIL is a problem here.
I'd suggest using a multiprocessing model, a Python implementation that doesn't have it (IronPython, Jython, etc), or a different language altogether (if you're doing performance-sensitive work, there's no end of languages nearly as fluid as Python but with considerably better runtime performance).