Asynchronous multiprocessing with a worker pool in Python: how to keep going after timeout?

16.3k views Asked by At

I would like to run a number of jobs using a pool of processes and apply a given timeout after which a job should be killed and replaced by another working on the next task.

I have tried to use the multiprocessing module which offers a method to run of pool of workers asynchronously (e.g. using map_async), but there I can only set a "global" timeout after which all processes would be killed.

Is it possible to have an individual timeout after which only a single process that takes too long is killed and a new worker is added to the pool again instead (processing the next task and skipping the one that timed out)?

Here's a simple example to illustrate my problem:

def Check(n):
  import time
  if n % 2 == 0: # select some (arbitrary) subset of processes
    print "%d timeout" % n
    while 1:
      # loop forever to simulate some process getting stuck
      pass
  print "%d done" % n
  return 0

from multiprocessing import Pool
pool = Pool(processes=4)
result = pool.map_async(Check, range(10))
print result.get(timeout=1)    

After the timeout all workers are killed and the program exits. I would like instead that it continues with the next subtask. Do I have to implement this behavior myself or are there existing solutions?

Update

It is possible to kill the hanging workers and they are automatically replaced. So I came up with this code:

jobs = pool.map_async(Check, range(10))
while 1:
  try:
    print "Waiting for result"
    result = jobs.get(timeout=1)
    break # all clear
  except multiprocessing.TimeoutError: 
    # kill all processes
    for c in multiprocessing.active_children():
      c.terminate()
print result

The problem now is that the loop never exits; even after all tasks have been processed, calling get yields a timeout exception.

3

There are 3 answers

0
Vikas Gautam On

Try the construction where each process is being joined with a timeout on a separate thread. So the main program never gets stuck and as well the processes which if gets stuck, would be killed due to timeout. This technique is a combination of threading and multiprocessing modules.

Here is my way to maintain the minimum x number of threads in the memory. Its an combination of threading and multiprocessing modules. It may be unusual to other techniques like respected fellow members have explained above BUT may be worth considerable. For the sake of explanation, I am taking a scenario of crawling a minimum of 5 websites at a time.

so here it is:-

#importing dependencies.
from multiprocessing import Process
from threading import Thread
import threading

# Crawler function
def crawler(domain):
    # define crawler technique here.
    output.write(scrapeddata + "\n")
    pass

Next is threadController function. This function will control the flow of threads to the main memory. It will keep activating the threads to maintain the threadNum "minimum" limit ie. 5. Also it won't exit until, all Active threads(acitveCount) are finished up.

It will maintain a minimum of threadNum(5) startProcess function threads (these threads will eventually start the Processes from the processList while joining them with a time out of 60 seconds). After staring threadController, there would be 2 threads which are not included in the above limit of 5 ie. the Main thread and the threadController thread itself. thats why threading.activeCount() != 2 has been used.

def threadController():
    print "Thread count before child thread starts is:-", threading.activeCount(), len(processList)
    # staring first thread. This will make the activeCount=3
    Thread(target = startProcess).start()
    # loop while thread List is not empty OR active threads have not finished up.
    while len(processList) != 0 or threading.activeCount() != 2:
        if (threading.activeCount() < (threadNum + 2) and # if count of active threads are less than the Minimum AND
            len(processList) != 0):                            # processList is not empty
                Thread(target = startProcess).start()         # This line would start startThreads function as a seperate thread **

startProcess function, as a separate thread, would start Processes from the processlist. The purpose of this function (**started as a different thread) is that It would become a parent thread for Processes. So when It will join them with a timeout of 60 seconds, this would stop the startProcess thread to move ahead but this won't stop threadController to perform. So this way, threadController will work as required.

def startProcess():
    pr = processList.pop(0)
    pr.start()
    pr.join(60.00) # joining the thread with time out of 60 seconds as a float.

if __name__ == '__main__':
    # a file holding a list of domains
    domains = open("Domains.txt", "r").read().split("\n")
    output = open("test.txt", "a")
    processList = [] # thread list
    threadNum = 5 # number of thread initiated processes to be run at one time

    # making process List
    for r in range(0, len(domains), 1):
        domain = domains[r].strip()
        p = Process(target = crawler, args = (domain,))
        processList.append(p) # making a list of performer threads.

    # starting the threadController as a seperate thread.
    mt = Thread(target = threadController)
    mt.start()
    mt.join() # won't let go next until threadController thread finishes.

    output.close()
    print "Done"

Besides maintaining a minimum number of threads in the memory, my aim was to also have something which could avoid stuck threads or processes in the memory. I did this using the time out function. My apologies for any typing mistake.

I hope this construction would help anyone in this world.

Regards,

Vikas Gautam

0
noxdafox On

The pebble Pool module has been built for solving these types of issue. It supports timeout on given tasks allowing to detect them and easily recover.

from pebble import ProcessPool
from concurrent.futures import TimeoutError

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1,2], timeout=5)

try:
    result = future.result()
except TimeoutError:
    print "Function took longer than %d seconds" % error.args[1]

For your specific example:

from pebble import ProcessPool
from concurrent.futures import TimeoutError

results = []

with ProcessPool(max_workers=4) as pool:
    future = pool.map(Check, range(10), timeout=5)

    iterator = future.result()

    # iterate over all results, if a computation timed out
    # print it and continue to the next result
    while True:
        try:
            result = next(iterator)
            results.append(result)
        except StopIteration:
            break  
        except TimeoutError as error:
            print "function took longer than %d seconds" % error.args[1] 

print results
0
luart On

Currently the Python does not provide native means to the control execution time of each distinct task in the pool outside the worker itself.
So the easy way is to use wait_procs in the psutil module and implement the tasks as subprocesses.
If nonstandard libraries are not desirable, then you have to implement own Pool on base of subprocess module having the working cycle in the main process, poll() - ing the execution of each worker and performing required actions.

As for the updated problem, the pool becomes corrupted if you directly terminate one of the workers (it is the bug in the interpreter implementation, because such behavior should not be allowed): the worker is recreated, but the task is lost and the pool becomes nonjoinable. You have to terminate all the pool and then recreate it again for another tasks:

from multiprocessing import Pool
while True:
    pool = Pool(processes=4)
    jobs = pool.map_async(Check, range(10))
    print "Waiting for result"
    try:
        result = jobs.get(timeout=1)
        break # all clear
    except multiprocessing.TimeoutError: 
        # kill all processes
        pool.terminate()
        pool.join()
print result    

UPDATE

Pebble is an excellent and handy library, which solves the issue. Pebble is designed for the asynchronous execution of Python functions, where is PyExPool is designed for the asynchronous execution of modules and external executables, though both can be used interchangeably.

One more aspect is when 3dparty dependencies are not desirable, then PyExPool can be a good choice, which is a single-file lightweight implementation of Multi-process Execution Pool with per-Job and global timeouts, opportunity to group Jobs into Tasks and other features.
PyExPool can be embedded into your sources and customized, having permissive Apache 2.0 license and production quality, being used in the core of one high-loaded scientific benchmarking framework.