Exit from multiprocessing pool upon exception or KeyboardInterrupt?

3.2k views Asked by At

I would like my program to exit as soon as I press Ctrl+C:

import multiprocessing
import os
import time

def sqr(a):
    time.sleep(0.2)
    print 'local {}'.format(os.getpid())
    #raise Exception()
    return a * a

pool = multiprocessing.Pool(processes=4)

try:
    r = [pool.apply_async(sqr, (x,)) for x in range(100)]
    pool.close()
    pool.join()
except:
    print 121312313
    pool.terminate()
    pool.join()

print 'main {}'.format(os.getpid())

This code doesn't work as intended: the program does not quit when I press Ctrl+C. Instead, it prints a few KeyboardInterrupt each time, and just gets stuck forever.

Also, I would like it to exit ASAP if I uncomment #raise ... in sqr. The solutions in Exception thrown in multiprocessing Pool not detected do not seem to be helpful.

Update

I think I finally ended up with this: (let me know if it is wrong)

def sqr(a):
    time.sleep(0.2)
    print 'local {}'.format(os.getpid())
    if a == 20:
        raise Exception('fff')
    return a * a

pool = Pool(processes=4)


r = [pool.apply_async(sqr, (x,)) for x in range(100)]

pool.close()

# Without timeout, cannot respond to KeyboardInterrupt.
# Also need get to raise the exceptions workers may throw.
for item in r:
    item.get(timeout=999999)

# I don't think I need join since I already get everything.
pool.join()

print 'main {}'.format(os.getpid())
1

There are 1 answers

2
dano On BEST ANSWER

This is because of a Python 2.x bug that makes the call to pool.join() uninterruptable. It works fine in Python 3.x. Normally the work around is to pass a really large timeout to join, but multiprocessing.Pool.join doesn't take a timeout parameter, so you can't use it at all. Instead, you'll need to wait for each individual task in the pool to complete, and pass timeout to the wait() method:

import multiprocessing
import time
import os

Pool = multiprocessing.Pool

def sqr(a):
    time.sleep(0.2)
    print('local {}'.format(os.getpid()))
    #raise Exception()
    return a * a

pool = Pool(processes=4)

try:
    r = [pool.apply_async(sqr, (x,)) for x in range(100)]
    pool.close()
    for item in r:
        item.wait(timeout=9999999) # Without a timeout, you can't interrupt this.
except KeyboardInterrupt:
    pool.terminate()
finally:
    pool.join()

print('main {}'.format(os.getpid()))

This can be interrupted on both Python 2 and 3.