The ThreadedWorkerQueue.add_worker()
method blocks until a Worker
has been consumed. Is there a nice design to allow adding new workers to the ThreadedWorkerQueue
without blocking the thread calling ~.add_worker()
, but still working with conditions?
Here's a short SSCCE:
import time
import threading
class Worker(object):
def work(self):
pass
class TimeWorker(Worker):
def __init__(self, seconds):
super(TimeWorker, self).__init__()
self.seconds = seconds
def work(self):
for i in xrange(self.seconds):
print "Working ... (%d)" % i
time.sleep(1)
class ThreadedWorkerQueue(threading.Thread):
def __init__(self):
super(ThreadedWorkerQueue, self).__init__()
self.condition = threading.Condition()
self.workers = []
self.running = False
def add_worker(self, worker):
with self.condition:
self.workers.append(worker)
self.condition.notify()
def stop(self):
with self.condition:
self.running = False
self.condition.notify()
self.join()
def consume(self):
if self.workers:
worker = self.workers.pop(0)
worker.work()
def run(self):
self.running = True
while True:
with self.condition:
if not self.running:
break
self.condition.wait()
self.consume()
def main():
queue = ThreadedWorkerQueue()
queue.start()
queue.add_worker(TimeWorker(3))
time.sleep(1)
tstart = time.time()
queue.add_worker(TimeWorker(2))
print "Blocked", time.time() - tstart, "seconds until worker was added."
queue.stop()
main()
Edit
Ok, so my original idea was that a Condition can be waked up when the Thread can continue to consume Workers. This is the basic principle of the Producer/Consumer design, to skip continuous polling and really only do the work when there is work to do.
Just now, I had an idea using a lock that is acquired by default and is released when new workers can be consumed. But I'm not sure if this is a good method to do it. Can someone spot an issue (eg. potential deadlocks)?
The full code is on GitHub: https://github.com/NiklasRosenstein/async/blob/73828ecaa2990a71b63caf93c32f9cce5ec11d27/async.py#L686-L750
class ThreadedWorkerQueue(WorkerQueue, threading.Thread):
r""" This class implements the consumer design, introducing :class:`Worker`
objects to start working as soon as there are new workers available. Every
object adding Workers to this queue are the producers. """
def __init__(self):
WorkerQueue.__init__(self)
threading.Thread.__init__(self)
self.lock = threading.Lock()
def __del__(self):
if self.running:
self.stop()
warnings.warn("ThreadedWorkerQueue not stopped before its lost.")
def notify(self):
r""" Notify the ThreadedWorkerQueue that processing can be continued.
It is usually not necessary to call this method manually. """
try:
self.lock.release()
except (thread.error, RuntimeError):
pass
def stop(self, join=True, clear=False):
r""" Interrupt the thread in its doing, pausing the threads actions
until :meth:`start` is called again. All remaining workers are kept
alive unless *clear* is specified True. """
if clear: self.workers.clear()
self.running = False
self.notify()
if join: self.join()
def consume(self):
r""" Just like :meth:`WorkerQueue.work_off`, but doesn't override
the value of :prop:`running`. """
while self.workers:
worker = self.workers.popleft()
self.current = worker
worker.work()
self.current = None
# WorkerQueue
def add_worker(self, worker):
super(ThreadedWorkerQueue, self).add_worker(worker)
self.notify()
# threading.Thread
def start(self):
r""" Overrides :meth:`threading.Thread.start` to allow starting
the ThreadedWorkerQueue multiple times. """
threading.Thread.__init__(self)
return threading.Thread.start(self)
def run(self):
self.running = True
while True:
self.consume()
if not self.running: break
self.lock.acquire()
You don't need conditions. Python already has a perfect mechanism exactly for this:
Queue.Queue
. ChangeThreadedWorkerQueue.workers
from a list to aQueue
, and you don't need to worry about conditions, locks, notifications, etc. It will vastly simplify your code.You need to replace:
Queue
Queue.put
Queue.get
and get rid of
with self.condition: ...
.Also, it's not a good practice to call
self.join()
from withinstop()
. Leave it to the calling thread. In case you have several threads you need to stop, you'd want to first stop them all, and only then join them all.