I'm building a library that leverages asyncio internally. While the user shouldn't be aware of it, the internal implementation currently wraps the async code with the asyncio.run() porcelain wrapper.
However, some users will be executing this library code from a jupyter notebook, and I'm struggling to replace the asyncio.run()
with a wrapper that's safe for either environment.
Here's what I've tried:
ASYNC_IO_NO_RUNNING_LOOP_MSG = 'no running event loop'
def jupyter_safe_run_coroutine(async_coroutine, _test_mode: bool = False)
try:
loop = asyncio.get_running_loop()
task = loop.create_task(async_coroutine)
result = loop.run_until_complete(task) # <- fails as loop is already running
# OR
asyncio.wait_for(task, timeout=None, loop=loop) # <- fails as this is an async method
result = task.result()
except RuntimeError as e:
if _test_mode:
raise e
if ASYNC_IO_NO_RUNNING_LOOP_MSG in str(e):
return asyncio.run(async_coroutine)
except Exception as e:
raise e
Requirements
- We use python 3.8, so we can't use asyncio.Runner context manager
- We can't use threading, so the solution suggested here would not work
Problem:
How can I wait/await for the async_coroutine
, or the task/future provided by loop.create_task(async_coroutine)
to be completed?
None of the methods above actually do the waiting, and for the reasons stated in the comments.
Update
I've found this nest_asyncio library that's built to solve this problem exactly:
ASYNC_IO_NO_RUNNING_LOOP_MSG = 'no running event loop'
HAS_BEEN_RUN = False
def jupyter_safe_run_coroutine(async_coroutine, _test_mode: bool = False):
global HAS_BEEN_RUN
if not HAS_BEEN_RUN:
_apply_nested_asyncio_patch()
HAS_BEEN_RUN = True
return asyncio.run(async_coroutine)
def _apply_nested_asyncio_patch():
try:
loop = asyncio.get_running_loop()
logger.info(f'as get_running_loop() returned {loop}, this environment has it`s own event loop.\n'
f'Patching with nest_asyncio')
import nest_asyncio
nest_asyncio.apply()
except RuntimeError as e:
if ASYNC_IO_NO_RUNNING_LOOP_MSG in str(e):
logger.info(f'as get_running_loop() raised {e}, this environment does not have it`s own event loop.\n'
f'No patching necessary')
else:
raise e
Still, there are some issues I'm facing with it:
- As per this SO answer, there might be starvation issues
- Any logs written in the async_coroutine are not printed in the jupyter notebook
- The jupyter notebook kernel occasionally crashes upon completion of the task
Edit
For context, the library internally calls external APIs for data enrichment of a user-provided dataframe:
# user code using the library
import my_lib
df = pd.DataFrame(data='some data')
enriched_df = my_lib.enrich(df)
It's usually a good idea to expose the asynchronous function. This way you will give your users more flexibility.
If some of your users can't (or don't want to) use asynchronous calls to your functions, they will be able to call the async function using
asyncio.run(your_function())
. Or in the rare situation where they have an event loop running but can't make async calls they could use thecreate_task
+add_one_callback
method described here. (I really have no idea why such a use case may happen, but for the sake of the argument I included it.)Hidding the asynchronous interface from your users is not the best idea because it limits their capabilities. They will probably fork your package to patch it and make the exposed function async or call the hidden async function directly. None of which is good news for you (harder to document / track bugs). I would really suggest to stick to the simplest solution and provide the
async
functions as the main entry points.Suppose the following package code followed by 3 different usage of it:
Client codes
Typical clients will probably just use it this way:
For some people, the following might make sense. For example if your package is the only asynchronous thing they will ever use. Or maybe they are not yet confortable using async code (these you can probably convince to try
client_code_a
instead):The very few (I'm tempted to say none):