jupyter notebooks-safe asyncio run wrapper method for a library

755 views Asked by At

I'm building a library that leverages asyncio internally. While the user shouldn't be aware of it, the internal implementation currently wraps the async code with the asyncio.run() porcelain wrapper.

However, some users will be executing this library code from a jupyter notebook, and I'm struggling to replace the asyncio.run() with a wrapper that's safe for either environment.

Here's what I've tried:

ASYNC_IO_NO_RUNNING_LOOP_MSG = 'no running event loop'


def jupyter_safe_run_coroutine(async_coroutine, _test_mode: bool = False)
    try:
        loop = asyncio.get_running_loop()
        task = loop.create_task(async_coroutine)
        result = loop.run_until_complete(task) # <- fails as loop is already running
        # OR
        asyncio.wait_for(task, timeout=None, loop=loop) # <- fails as this is an async method
        result = task.result()
    except RuntimeError as e:
        if _test_mode:
            raise e
        if ASYNC_IO_NO_RUNNING_LOOP_MSG in str(e):
            return asyncio.run(async_coroutine)
    except Exception as e:
        raise e

Requirements

  1. We use python 3.8, so we can't use asyncio.Runner context manager
  2. We can't use threading, so the solution suggested here would not work

Problem:

How can I wait/await for the async_coroutine, or the task/future provided by loop.create_task(async_coroutine) to be completed?

None of the methods above actually do the waiting, and for the reasons stated in the comments.


Update

I've found this nest_asyncio library that's built to solve this problem exactly:


ASYNC_IO_NO_RUNNING_LOOP_MSG = 'no running event loop'

HAS_BEEN_RUN = False


def jupyter_safe_run_coroutine(async_coroutine, _test_mode: bool = False):
    global HAS_BEEN_RUN
    if not HAS_BEEN_RUN:
        _apply_nested_asyncio_patch()
        HAS_BEEN_RUN = True
    return asyncio.run(async_coroutine)


def _apply_nested_asyncio_patch():
    try:
        loop = asyncio.get_running_loop()
        logger.info(f'as get_running_loop() returned {loop}, this environment has it`s own event loop.\n'
                    f'Patching with nest_asyncio')
        import nest_asyncio
        nest_asyncio.apply()
    except RuntimeError as e:
        if ASYNC_IO_NO_RUNNING_LOOP_MSG in str(e):
            logger.info(f'as get_running_loop() raised {e}, this environment does not have it`s own event loop.\n'
                        f'No patching necessary')
        else:
            raise e

Still, there are some issues I'm facing with it:

  1. As per this SO answer, there might be starvation issues
  2. Any logs written in the async_coroutine are not printed in the jupyter notebook
  3. The jupyter notebook kernel occasionally crashes upon completion of the task

Edit

For context, the library internally calls external APIs for data enrichment of a user-provided dataframe:

# user code using the library
import my_lib

df = pd.DataFrame(data='some data')
enriched_df = my_lib.enrich(df)
2

There are 2 answers

0
cglacet On BEST ANSWER

It's usually a good idea to expose the asynchronous function. This way you will give your users more flexibility.

If some of your users can't (or don't want to) use asynchronous calls to your functions, they will be able to call the async function using asyncio.run(your_function()). Or in the rare situation where they have an event loop running but can't make async calls they could use the create_task + add_one_callback method described here. (I really have no idea why such a use case may happen, but for the sake of the argument I included it.)

Hidding the asynchronous interface from your users is not the best idea because it limits their capabilities. They will probably fork your package to patch it and make the exposed function async or call the hidden async function directly. None of which is good news for you (harder to document / track bugs). I would really suggest to stick to the simplest solution and provide the async functions as the main entry points.

Suppose the following package code followed by 3 different usage of it:

async def package_code():
    return "package"

Client codes

Typical clients will probably just use it this way:

async def client_code_a():
    print(await package_code())

# asyncio.run(client_code_a())

For some people, the following might make sense. For example if your package is the only asynchronous thing they will ever use. Or maybe they are not yet confortable using async code (these you can probably convince to try client_code_a instead):

def client_code_b():
    print(asyncio.run(package_code()))

# client_code_b()

The very few (I'm tempted to say none):

async def client_code_c():
    # asyncio.run() cannot be called from a running event loop:
    # print(asyncio.run(package_code()))
    loop = asyncio.get_running_loop()
    task = loop.create_task(package_code())
    task.add_done_callback(lambda t: print(t.result()))

# asyncio.run(client_code_c())
10
cglacet On

I'm still not sure to understand what your goal is, but I'll describe with code what I tried to explain in my comment so you can tell me where your issue lies in the following.

If you package requests the user to call some functions (your_package_function in the example) that take coroutines as arguments, then you shouldn't worry about the event loop.

That means the package shouldn't call asyncio.run nor loop.run_until_complete. The client should (in almost all cases) be responsible for starting the even loop.

Your package code should assume there is an event loop running. Since I don't know your package's goal I just made a function that feeds a "test" argument to any coroutine the client is passing:

import asyncio

async def your_package_function(coroutine):
    print("- Package internals start")
    task = asyncio.create_task(coroutine("test"))
    await asyncio.sleep(.5) # Simulates slow tasks within your package
    print("- Package internals completed other task")
    x = await task
    print("- Package internals end")
    return x

The client (package user) should then call the following:

async def main():
    x = await your_package_function(return_with_delay)
    print(f"Computed value = {x}")

async def return_with_delay(value):
    print("+ User function start")
    await asyncio.sleep(.2)
    print("+ User function end")
    return value

await main()
# or asyncio.run(main()) if needed

This would print:

- Package internals start
- Package internals completed other task
+ User function start
+ User function end
- Package internals end
Computed value = test