The ThreadedWorkerQueue.add_worker() method blocks until a Worker has been consumed. Is there a nice design to allow adding new workers to the ThreadedWorkerQueue without blocking the thread calling ~.add_worker(), but still working with conditions?
Here's a short SSCCE:
import time
import threading
class Worker(object):
def work(self):
pass
class TimeWorker(Worker):
def __init__(self, seconds):
super(TimeWorker, self).__init__()
self.seconds = seconds
def work(self):
for i in xrange(self.seconds):
print "Working ... (%d)" % i
time.sleep(1)
class ThreadedWorkerQueue(threading.Thread):
def __init__(self):
super(ThreadedWorkerQueue, self).__init__()
self.condition = threading.Condition()
self.workers = []
self.running = False
def add_worker(self, worker):
with self.condition:
self.workers.append(worker)
self.condition.notify()
def stop(self):
with self.condition:
self.running = False
self.condition.notify()
self.join()
def consume(self):
if self.workers:
worker = self.workers.pop(0)
worker.work()
def run(self):
self.running = True
while True:
with self.condition:
if not self.running:
break
self.condition.wait()
self.consume()
def main():
queue = ThreadedWorkerQueue()
queue.start()
queue.add_worker(TimeWorker(3))
time.sleep(1)
tstart = time.time()
queue.add_worker(TimeWorker(2))
print "Blocked", time.time() - tstart, "seconds until worker was added."
queue.stop()
main()
Edit
Ok, so my original idea was that a Condition can be waked up when the Thread can continue to consume Workers. This is the basic principle of the Producer/Consumer design, to skip continuous polling and really only do the work when there is work to do.
Just now, I had an idea using a lock that is acquired by default and is released when new workers can be consumed. But I'm not sure if this is a good method to do it. Can someone spot an issue (eg. potential deadlocks)?
The full code is on GitHub: https://github.com/NiklasRosenstein/async/blob/73828ecaa2990a71b63caf93c32f9cce5ec11d27/async.py#L686-L750
class ThreadedWorkerQueue(WorkerQueue, threading.Thread):
r""" This class implements the consumer design, introducing :class:`Worker`
objects to start working as soon as there are new workers available. Every
object adding Workers to this queue are the producers. """
def __init__(self):
WorkerQueue.__init__(self)
threading.Thread.__init__(self)
self.lock = threading.Lock()
def __del__(self):
if self.running:
self.stop()
warnings.warn("ThreadedWorkerQueue not stopped before its lost.")
def notify(self):
r""" Notify the ThreadedWorkerQueue that processing can be continued.
It is usually not necessary to call this method manually. """
try:
self.lock.release()
except (thread.error, RuntimeError):
pass
def stop(self, join=True, clear=False):
r""" Interrupt the thread in its doing, pausing the threads actions
until :meth:`start` is called again. All remaining workers are kept
alive unless *clear* is specified True. """
if clear: self.workers.clear()
self.running = False
self.notify()
if join: self.join()
def consume(self):
r""" Just like :meth:`WorkerQueue.work_off`, but doesn't override
the value of :prop:`running`. """
while self.workers:
worker = self.workers.popleft()
self.current = worker
worker.work()
self.current = None
# WorkerQueue
def add_worker(self, worker):
super(ThreadedWorkerQueue, self).add_worker(worker)
self.notify()
# threading.Thread
def start(self):
r""" Overrides :meth:`threading.Thread.start` to allow starting
the ThreadedWorkerQueue multiple times. """
threading.Thread.__init__(self)
return threading.Thread.start(self)
def run(self):
self.running = True
while True:
self.consume()
if not self.running: break
self.lock.acquire()
You don't need conditions. Python already has a perfect mechanism exactly for this:
Queue.Queue. ChangeThreadedWorkerQueue.workersfrom a list to aQueue, and you don't need to worry about conditions, locks, notifications, etc. It will vastly simplify your code.You need to replace:
QueueQueue.putQueue.getand get rid of
with self.condition: ....Also, it's not a good practice to call
self.join()from withinstop(). Leave it to the calling thread. In case you have several threads you need to stop, you'd want to first stop them all, and only then join them all.