I am trying to write the below python script, where for I have 5 servers and each server can process at max 2 requests concurrently. I have a pool of 10 requests to processed. Each server picks 2 requests each, processes them and picks up another request from the pool as soon as they have capacity to do so.
The below code which I have written wait for all 2 requests by the server and only then the server picks another 2 requests. I want it to pick a request as soon as it's done processing one.
async def process_request(server_id, request_id):
processing_time = random.randint(10, 30)
print(f"Server {server_id} is processing request {request_id} for {processing_time} seconds")
await asyncio.sleep(processing_time)
print(f"Server {server_id} finished processing request {request_id}")
async def server_worker(server_id, queue, num_concurrent_requests_per_server):
while True:
request_ids = []
for _ in range(num_concurrent_requests_per_server):
try:
request_id = await queue.get()
request_ids.append(request_id)
except asyncio.QueueEmpty:
break
tasks = []
for request_id in request_ids:
task = asyncio.create_task(process_request(server_id, request_id))
tasks.append(task)
await asyncio.gather(*tasks)
for _ in range(len(request_ids)):
await queue.put(random.randint(1, 100)) # Add one more request to the queue
async def main():
num_servers = 5
num_concurrent_requests_per_server = 2
total_requests = 100
servers = [asyncio.Queue() for _ in range(num_servers)]
# Start server workers
server_tasks = []
for i in range(num_servers):
task = asyncio.create_task(server_worker(i, servers[i], num_concurrent_requests_per_server))
server_tasks.append(task)
# Generate and enqueue initial requests
for _ in range(num_servers * num_concurrent_requests_per_server):
server_id = _ % num_servers
await servers[server_id].put(random.randint(1, 100))
# Wait for all requests to be processed
await asyncio.gather(*[servers[i].join() for i in range(num_servers)])
# Cancel server workers
for task in server_tasks:
task.cancel()
if __name__ == "__main__":
asyncio.run(main())
You could try and modify the
server_worker
function to a single shared queue from which all servers fetch requests (instead of creating a separate queue for each server): that would simplify the logic for handling requests and make sure any server can pick up a new request as soon as it has capacity.Each
server_worker
now maintains a set of ongoing request tasks (ongoing_requests
). It adds a new task to this set whenever it starts processing a request and removes it when the request is done. That way, it can always check if it has capacity to start a new request.Instead of collecting tasks and waiting for all to finish,
server_worker
immediately handles tasks. If the server is at capacity, it waits for at least one task to finish before continuing to poll the queue.