Designing python code to execute concurrent requests by server for maximum requests

177 views Asked by At

I am trying to write the below python script, where for I have 5 servers and each server can process at max 2 requests concurrently. I have a pool of 10 requests to processed. Each server picks 2 requests each, processes them and picks up another request from the pool as soon as they have capacity to do so.

The below code which I have written wait for all 2 requests by the server and only then the server picks another 2 requests. I want it to pick a request as soon as it's done processing one.

async def process_request(server_id, request_id):
    processing_time = random.randint(10, 30)
    print(f"Server {server_id} is processing request {request_id} for {processing_time} seconds")
    await asyncio.sleep(processing_time)
    print(f"Server {server_id} finished processing request {request_id}")

async def server_worker(server_id, queue, num_concurrent_requests_per_server):
    while True:
        request_ids = []
        for _ in range(num_concurrent_requests_per_server):
            try:
                request_id = await queue.get()
                request_ids.append(request_id)
            except asyncio.QueueEmpty:
                break

        tasks = []
        for request_id in request_ids:
            task = asyncio.create_task(process_request(server_id, request_id))
            tasks.append(task)
        await asyncio.gather(*tasks)
        for _ in range(len(request_ids)):
            await queue.put(random.randint(1, 100))  # Add one more request to the queue

async def main():
    num_servers = 5
    num_concurrent_requests_per_server = 2
    total_requests = 100

    servers = [asyncio.Queue() for _ in range(num_servers)]

    # Start server workers
    server_tasks = []
    for i in range(num_servers):
        task = asyncio.create_task(server_worker(i, servers[i], num_concurrent_requests_per_server))
        server_tasks.append(task)

    # Generate and enqueue initial requests
    for _ in range(num_servers * num_concurrent_requests_per_server):
        server_id = _ % num_servers
        await servers[server_id].put(random.randint(1, 100))

    # Wait for all requests to be processed
    await asyncio.gather(*[servers[i].join() for i in range(num_servers)])

    # Cancel server workers
    for task in server_tasks:
        task.cancel()

if __name__ == "__main__":
    asyncio.run(main())
4

There are 4 answers

0
VonC On

You could try and modify the server_worker function to a single shared queue from which all servers fetch requests (instead of creating a separate queue for each server): that would simplify the logic for handling requests and make sure any server can pick up a new request as soon as it has capacity.

import asyncio
import random

async def process_request(server_id, request_id):
    processing_time = random.randint(10, 30)
    print(f"Server {server_id} is processing request {request_id} for {processing_time} seconds")
    await asyncio.sleep(processing_time)
    print(f"Server {server_id} finished processing request {request_id}")

async def server_worker(server_id, queue, num_concurrent_requests_per_server):
    ongoing_requests = set()
    while True:
        if len(ongoing_requests) < num_concurrent_requests_per_server:
            try:
                request_id = await queue.get()
                task = asyncio.create_task(process_request(server_id, request_id))
                ongoing_requests.add(task)
                task.add_done_callback(lambda _ : ongoing_requests.remove(task))
            except asyncio.QueueEmpty:
                break
        else:
            # Wait for any one of the tasks to complete
            done, _ = await asyncio.wait(ongoing_requests, return_when=asyncio.FIRST_COMPLETED)
            for task in done:
                ongoing_requests.remove(task)

async def main():
    num_servers = 5
    num_concurrent_requests_per_server = 2
    total_requests = 10  # Adjusted to match your setup description

    queue = asyncio.Queue()
    # Generate and enqueue initial requests
    for _ in range(total_requests):
        await queue.put(random.randint(1, 100))

    server_tasks = [
        asyncio.create_task(server_worker(i, queue, num_concurrent_requests_per_server))
        for i in range(num_servers)
    ]

    # Wait for all requests to be processed
    await asyncio.gather(*server_tasks)

if __name__ == "__main__":
    asyncio.run(main())

Each server_worker now maintains a set of ongoing request tasks (ongoing_requests). It adds a new task to this set whenever it starts processing a request and removes it when the request is done. That way, it can always check if it has capacity to start a new request.

Instead of collecting tasks and waiting for all to finish, server_worker immediately handles tasks. If the server is at capacity, it waits for at least one task to finish before continuing to poll the queue.

0
Johnny Cheesecutter On

to fix the issue you should do the following:

  1. instead of collecting request_ids in 'list' and checking 'list' length agains 'num_concurrent_requests_per_server' you should add tasks to a list and check how many active tasks you have.
  2. replace asyncio.gather with asyncio.wait and wait for the first task to complete. If one of the tasks is done - you update the task queue and add new tasks to it.

Below is the code with implementation:

async def server_worker(server_id, queue, num_concurrent_requests_per_server):

    request_ids = []
    tasks = []    
    while True:
        
        # if we are not full create a waiter for a new task
        if len(tasks)<num_concurrent_requests_per_server:
            task = asyncio.create_task(queue.get(), name='job_getter')
            tasks.append(task)
        
        # await till first task done
        finished, unfinished = await asyncio.wait(tasks,return_when='FIRST_COMPLETED')
        
        # add back unfinished tasks
        tasks = [task for task in unfinished]

        for task in finished:
            if task.get_name() == 'job_getter':
                result = task.result()
                if not result:
                    continue
                request_id = result
                request_ids.append(request_id)
                task = asyncio.create_task(process_request(server_id, request_id))
                tasks.append(task)
            else:
                # request with 'request_id' finished do something...
                ...

        for _ in range(len(request_ids)):
            await queue.put(random.randint(1, 100))  # Add one more request to the queue
0
Booboo On

It appears that you want to have 5 servers each processing two concurrent requests and when these 2 requests have completed to then enqueue an additional 2 requests (the servers never terminate as a result). Essentially for any request that is processed a new request is enqueued, although your logic does the processing and enqueueing in pairs.

I believe we can achieve the same results if instead of having 5 servers first getting 2 requests and only then creating 2 new tasks to process them, we have a single queue for 10 servers processing requests one at a time and for every request that gets processed the server enqueues an additional request. The advantage here is simplified code and no extra task creation code required for each request. It also makes sense to have a single queue for all requests.

This only works if the order in which requests are gotten from the single queue does not matter, that is they do not have to be requests that were enqueued successively, which can only be guaranteed when using one queue per server as you have done. I believe this to be the case since the way you are enqueueing requests is in a round-robin fashion and therefore a single queue does not have successively created requests. But perhaps you have over-simplified the code. Anyway, the following code is based on that assumption.

Note that the call to queue.get() can never raise a QueueEmpty exception.

import asyncio
import random

async def server_worker(server_id, queue):
    while True:
        # This will never raise a QueueEmpty exception:
        request_id = await queue.get()

        processing_time = random.randint(10, 30)
        print(f"Server {server_id} is processing request {request_id} for {processing_time} seconds")
        await asyncio.sleep(processing_time)
        print(f"Server {server_id} finished processing request {request_id}")

        # Queue up a new request:
        await queue.put(random.randint(1, 100))

async def main():
    num_servers = 10
    total_requests = 100

    queue = asyncio.Queue()

    # Generate and enqueue requests
    for _ in range(total_requests):
        await queue.put(random.randint(1, 100))

    # Create and await for all server tasks to complete
    await asyncio.gather(*[server_worker(i, queue) for i in range(num_servers)])

if __name__ == "__main__":
    asyncio.run(main())
0
Elnaz On

To fix this, you need to change your server_worker function to pick up a new request as soon as one of the tasks is done. You can do this by using the asyncio.as_completed function, which returns an iterator that yields the tasks as they are completed. You can also use the queue.task_done method to indicate that a request has been processed. Here is an example of how you can modify your server_worker function:

async def server_worker(server_id, queue, num_concurrent_requests_per_server):
tasks = set() # Use a set to store the tasks
while True:
   # Try to fill up the tasks set with new requests from the queue
  while len(tasks) < num_concurrent_requests_per_server:
    try:
     request_id = await queue.get()
     task = asyncio.create_task(process_request(server_id, request_id))
     tasks.add(task)
 except asyncio.QueueEmpty:
   break

# If there are no tasks, break the loop
if not tasks:
 break

# Wait for the first task to complete and remove it from the set
done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
   tasks.remove(task)
   queue.task_done() # Indicate that the request has been processed

   # Add one more request to the queue
await queue.put(random.randint(1, 100))

This way, each server worker will always try to process as many requests as possible, and pick up a new request as soon as one of the tasks is done.