Python threading, non-blocking production

1.8k views Asked by At

The ThreadedWorkerQueue.add_worker() method blocks until a Worker has been consumed. Is there a nice design to allow adding new workers to the ThreadedWorkerQueue without blocking the thread calling ~.add_worker(), but still working with conditions?

Here's a short SSCCE:

import time
import threading

class Worker(object):

    def work(self):
        pass

class TimeWorker(Worker):

    def __init__(self, seconds):
        super(TimeWorker, self).__init__()
        self.seconds = seconds

    def work(self):
        for i in xrange(self.seconds):
            print "Working ... (%d)" % i
            time.sleep(1)

class ThreadedWorkerQueue(threading.Thread):

    def __init__(self):
        super(ThreadedWorkerQueue, self).__init__()
        self.condition = threading.Condition()
        self.workers = []
        self.running = False

    def add_worker(self, worker):
        with self.condition:
            self.workers.append(worker)
            self.condition.notify()

    def stop(self):
        with self.condition:
            self.running = False
            self.condition.notify()
        self.join()

    def consume(self):
        if self.workers:
            worker = self.workers.pop(0)
            worker.work()

    def run(self):
        self.running = True
        while True:
            with self.condition:
                if not self.running:
                    break

                self.condition.wait()
                self.consume()

def main():
    queue = ThreadedWorkerQueue()
    queue.start()

    queue.add_worker(TimeWorker(3))
    time.sleep(1)

    tstart = time.time()
    queue.add_worker(TimeWorker(2))
    print "Blocked", time.time() - tstart, "seconds until worker was added."

    queue.stop()

main()

Edit

Ok, so my original idea was that a Condition can be waked up when the Thread can continue to consume Workers. This is the basic principle of the Producer/Consumer design, to skip continuous polling and really only do the work when there is work to do.

Just now, I had an idea using a lock that is acquired by default and is released when new workers can be consumed. But I'm not sure if this is a good method to do it. Can someone spot an issue (eg. potential deadlocks)?

The full code is on GitHub: https://github.com/NiklasRosenstein/async/blob/73828ecaa2990a71b63caf93c32f9cce5ec11d27/async.py#L686-L750

class ThreadedWorkerQueue(WorkerQueue, threading.Thread):
    r""" This class implements the consumer design, introducing :class:`Worker`
    objects to start working as soon as there are new workers available. Every
    object adding Workers to this queue are the producers. """

    def __init__(self):
        WorkerQueue.__init__(self)
        threading.Thread.__init__(self)
        self.lock = threading.Lock()

    def __del__(self):
        if self.running:
            self.stop()
            warnings.warn("ThreadedWorkerQueue not stopped before its lost.")

    def notify(self):
        r""" Notify the ThreadedWorkerQueue that processing can be continued.
        It is usually not necessary to call this method manually. """

        try:
            self.lock.release()
        except (thread.error, RuntimeError):
            pass

    def stop(self, join=True, clear=False):
        r""" Interrupt the thread in its doing, pausing the threads actions
        until :meth:`start` is called again. All remaining workers are kept
        alive unless *clear* is specified True. """

        if clear: self.workers.clear()
        self.running = False
        self.notify()
        if join: self.join()

    def consume(self):
        r""" Just like :meth:`WorkerQueue.work_off`, but doesn't override
        the value of :prop:`running`. """

        while self.workers:
            worker = self.workers.popleft()
            self.current = worker
            worker.work()
        self.current = None

    # WorkerQueue

    def add_worker(self, worker):
        super(ThreadedWorkerQueue, self).add_worker(worker)
        self.notify()

    # threading.Thread

    def start(self):
        r""" Overrides :meth:`threading.Thread.start` to allow starting
        the ThreadedWorkerQueue multiple times. """

        threading.Thread.__init__(self)
        return threading.Thread.start(self)

    def run(self):
        self.running = True
        while True:
            self.consume()
            if not self.running: break
            self.lock.acquire()
1

There are 1 answers

5
shx2 On

... but still working with conditions?

You don't need conditions. Python already has a perfect mechanism exactly for this: Queue.Queue. Change ThreadedWorkerQueue.workers from a list to a Queue, and you don't need to worry about conditions, locks, notifications, etc. It will vastly simplify your code.

You need to replace:

  • list of workers with a Queue
  • append+notify with Queue.put
  • wait+pop with Queue.get

and get rid of with self.condition: ....

Also, it's not a good practice to call self.join() from within stop(). Leave it to the calling thread. In case you have several threads you need to stop, you'd want to first stop them all, and only then join them all.