Make multiprocessing.Queue accessible from asyncio

1.1k views Asked by At

Given a multiprocessing.Queue that is filled from different Python threads, created via ThreadPoolExecutor.submit(...).

How to access that Queue with asyncio / Trio / Anyio in a safe manner (context FastAPI) and reliable manner?

I am aware of Janus library, but prefer a custom solution here.

Asked (hopefully) more concisely:

How to implement the

await <something_is_in_my_multiprocessing_queue>

to have it accesible with async/await and to prevent blocking the event loop?

What synchronization mechanism in general would you suggest?

(Attention here: multiprocessing.Queue not asyncio.Queue)

1

There are 1 answers

5
Martin Senne On BEST ANSWER

Actually, I figured it out.

Given a method, that reads the mp.Queue:

def read_queue_blocking():
    return queue.get()

Comment: And this is the main issue: A call to get is blocking.

We can now either

For FastAPI

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await websocket.accept()
    while True:
        import anyio
        queue_result = await anyio.to_thread.run_sync(read_queue_blocking)
        await websocket.send_text(f"Message text was: {queue_result}")