I have an endpoint in Django
that orchestrates a bunch of API calls to other underlying services. I intend to use a long-lived instance of httpx.AsyncClient
- which will be ultimately responsible for the API calls.
from functools import partial
class Api: # for historical reasons, this class has both sync and async methods
...
AsyncApi = partial(Api, async_client=httpx.AsyncClient(timeout=30))
class MyViewSet(viewsets.GenericViewSet):
@action(...)
def merge(self, request: Request) -> Response:
...
result = async_to_sync(merge_task)(..., api=AsyncApi(token=request.auth))
...
I'm doing some tests in a dummy server and I can see that sometimes merge_task
captures RuntimeError('Event loop is closed')
when calling api
(or, ultimately, the long-lived instance of httpx.AsyncClient
) so I'd bet that the loop for httpx.AsyncClient
is closed after some time and there's no issue with asgiref
's async_to_sync
.
Am I correct? If so, the only solution is to instantiate httpx.AsyncClient
on every request?
I tried do call merge_task
using the endpoint but it captured RuntimeError('Event loop is closed')
. The expectation is to not raise any error related to async programming