Use anyio.TaskGroup with fastapi.StreamingResponse

1.5k views Asked by At

anyio is a part of starlette and, therefore, of FastAPI. I find it quite convenient to use its task groups to perform concurrent requests to external services outside of one of my API servers.

Also, I would like to stream out the results as soon as they are ready. fastapi.StreamingResponse could do the trick, still I need to be able to keep the task group up and running after returning StreamingResponse, but it sounds like something that goes against the idea of structured concurrency.

Using an asynchronous generator may look like an obvious solution, but yield in general can not be used in a task group context, according to this: https://trio.readthedocs.io/en/stable/reference-core.html#cancel-scopes-and-nurseries

There is an example of a FastAPI server that seems to work, though it aggregates the responses before returning them:

import anyio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse


app = FastAPI()


@app.get("/")
async def root():
    # What to put below?
    result = await main()
    return StreamingResponse(iter(result))


async def main():
    send_stream, receive_stream = anyio.create_memory_object_stream()

    result = []
    async with anyio.create_task_group() as tg:
        async with send_stream:
            for num in range(5):
                tg.start_soon(sometask, num, send_stream.clone())

        async with receive_stream:
            async for entry in receive_stream:
                # What to do here???
                result.append(entry)

    return result


async def sometask(num, send_stream):
    await anyio.sleep(1)
    async with send_stream:
        await send_stream.send(f'number {num}\n')



if __name__ == "__main__":
    import uvicorn
    # Debug-only configuration
    uvicorn.run(app)

So, the question is, is there something similar to @trio_util.trio_async_generator in anyio, or is it possible to use @trio_util.trio_async_generator with FastAPI directly?

Maybe there are other solutions?

1

There are 1 answers

2
phi friday On
import anyio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


@app.get("/")
async def root():
    return StreamingResponse(main())


async def main():
    send_stream, receive_stream = anyio.create_memory_object_stream()

    async with anyio.create_task_group() as tg:
        async with send_stream:
            for num in range(5):
                tg.start_soon(sometask, num, send_stream.clone())

        async with receive_stream:
            async for entry in receive_stream:
                yield entry


async def sometask(num, send_stream):
    async with send_stream:
        for i in range(1000):
            await anyio.sleep(1)
            await send_stream.send(f"number {num}\n")


if __name__ == "__main__":
    import uvicorn

    # Debug-only configuration
    uvicorn.run(app)

unexpectedly, it works.