Using concurrent.futures to consume many dequeued messages a time

2k views Asked by At

I'm consuming messages from a RabbitMQ channel, I wish I could consume n elements at a time. I think I could use a ProcessPoolExecutor (or ThreadPoolExecutor). I just wonder if it's possible to know if there's a free executor in the pool.

This is what I want to write:

executor = futures.ProcessPoolExecutor(max_workers=5)
running = []
def consume(message):
    print "actually consuming a single message"

def on_message(channel, method_frame, header_frame, message):
    # this method is called once per incoming message
    future = executor.submit(consume, message)
    block_until_a_free_worker(executor, future)

def block_until_a_free_worker(executor, future):
    running.append(future) # this grows forever!
    futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)

[...]
channel.basic_consume(on_message, 'my_queue')
channel.start_consuming()

I need to write the function block_until_a_free_worker. This methods should be able to check if all the running workers are in use or not.

In alternative I could use any blocking executor.submit option, if available.

I tried a different approach and change the list of futures meanwhile they are completed. I tried to explicitly add and remove futures from a list and then waiting like this:

futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)

It seems it's not a solution.

I could set a future.add_done_callback, and possibily count the running instances...

Any hint or ideas? Thank you.

1

There are 1 answers

0
noxdafox On BEST ANSWER

I gave a similar answer here.

Semaphores serve the purpose of limiting the access to a resource to a set of workers.

from threading import Semaphore
from concurrent.futures import ProcessPoolExecutor 

class TaskManager:
    def __init__(self, workers):
        self.pool = ProcessPoolExecutor(max_workers=workers)
        self.workers = Semaphore(workers)

    def new_task(self, function):
        """Start a new task, blocks if all workers are busy."""
        self.workers.acquire()  # flag a worker as busy

        future = self.pool.submit(function, ... )

        future.add_task_done(self.task_done)

    def task_done(self, future):
        """Called once task is done, releases one worker."""
        self.workers.release()