I'm trying to have a pool of worker processes in my Pyramid application that can be used to carry out CPU-intensive (or long-running) background tasks that I don't want to bog down the views with. What I have right now works, but there is one problem: If waitress exits from termination (like it happens with --reload) the workers keep lingering, and I don't know how to signal them to stop.
Edit: This doesn't seem to be an issue when using Gunicorn (or just running it from some file). Could this be a bug with Waitress?
Edit2: Well. Or Gunicorn just handles this differently making it look better.
import multiprocessing as mp
from queue import Empty as QueueEmptyError
class MyPool(object):
def __init__(self, processes=10, queue=None):
self.jobqueue = queue if queue is not None else mp.Queue()
self.procs = []
self.running = True
for i in range(processes):
worker = mp.Process(target=self._worker, daemon=True)
self.procs.append(worker)
worker.start()
def __del__(self):
self.stopall()
def stopall(self):
self.running = False
for worker in self.procs:
worker.join()
def _worker(self):
while self.running:
try:
self._dojob(self.jobqueue.get(True, 1))
except QueueEmptyError:
pass
def _dojob(self, taskdata):
print(str(taskdata) + ' is happening')
class PoolMaster(object):
def __init__(self):
self.pools = []
self.aqueue = mp.Queue()
self.apool = MyPool(6, self.aqueue)
self.pools.append(self.apool)
def __del__(self):
for pool in self.pools:
pool.stopall()
def do_something(self):
self.aqueue.put_nowait('Something')
PoolMaster is instantiated once in my project's main() function and exposed to all views by adding it to all events.
What I tried before was adding a "poison pill" to the queue when __del__
happens, but as it turns out __del__
doesn't seem to get called at all. I don't want to use multiprocessing's own Pool because they seem to be made for running through a set workload once, not constantly working on a queue. So, how do I stop them from running after the actual application has exited?
Alright, I found a way to make it happen by changing the
_worker
function:When the parent process dies without ending the daemon
_worker
processesos.getppid()
will return 1 instead of the parents ID on UNIX-like systems. Every 3 seconds it stops waiting for new objects in the queue and checks for that.I don't feel like this is the proper way though.