I'm consuming messages from a RabbitMQ channel, I wish I could consume n elements at a time. I think I could use a ProcessPoolExecutor (or ThreadPoolExecutor). I just wonder if it's possible to know if there's a free executor in the pool.
This is what I want to write:
executor = futures.ProcessPoolExecutor(max_workers=5)
running = []
def consume(message):
print "actually consuming a single message"
def on_message(channel, method_frame, header_frame, message):
# this method is called once per incoming message
future = executor.submit(consume, message)
block_until_a_free_worker(executor, future)
def block_until_a_free_worker(executor, future):
running.append(future) # this grows forever!
futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)
[...]
channel.basic_consume(on_message, 'my_queue')
channel.start_consuming()
I need to write the function block_until_a_free_worker. This methods should be able to check if all the running workers are in use or not.
In alternative I could use any blocking executor.submit option, if available.
I tried a different approach and change the list of futures meanwhile they are completed. I tried to explicitly add and remove futures from a list and then waiting like this:
futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)
It seems it's not a solution.
I could set a future.add_done_callback, and possibily count the running instances...
Any hint or ideas? Thank you.
I gave a similar answer here.
Semaphores serve the purpose of limiting the access to a resource to a set of workers.