How to await for a job multiple times in trio?

118 views Asked by At

This is similar to Can I await the same Task multiple times in Python?, but for trio (instead of asyncio). Basically, in trio, how can I await for (the result value of) an async function multiple times, while actually only executing it once?

E.g., what should the argument be for coro_b and coro_c which are executed in parallel?

async def coro_a():
    print("executing coro a")
    return 'a'


async def coro_b(task_a):
    task_a_result = await task_a
    print("from coro_b: ", task_a_result)
    return 'b'


async def coro_c(task_a):
    task_a_result = await task_a
    print("from coro_a: ", task_a_result)
    return 'c'

def main():
    t_a = coro_a()

    # At some point
    t_b = coro_b(t_a)
    ...
    # At some other point, maybe concurrently to t_b
    t_c = coro_c(t_a)

    b = await t_b
    c = await t_c

main()

(In my case, the trio root loop is managed by some framework pyfuse3, and I only need to define my own subclass, which contains several async functions (which can be executed in parallel) to be implemented. So I'm not sure how the underlying call was made, but can safely assume they are made correct. Please feel free to supplement the remaining part if anyone feels useful to make this code snippet a "full" version containing a main function.)

(I'm familiar with JS promise, and more familiar with concurrent/parallel concepts and practices, but just do not have enough experience with asyncio and no experience with trio in Python. )

2

There are 2 answers

3
Matthias Urlichs On BEST ANSWER

Short answer: you don't.

Trio doesn't have tasks. Given

async def foo(i=21):
    await trio.sleep(i/10)
    return 2*i

you never do task = foo(); do_whatever(); result = await task. Not in trio. That's just not an idiom which Trio likes to support, for multiple reasons which are out of scope for this answer. Thus Trio doesn't have a "task" object. It doesn't need one.

Instead, you always do result = await foo(). There is no task, it's just some code you call (and get the result of). The question of awaiting it multiple times simply doesn't arise.

If you do want to process a result multiple times, you need a wrapper that saves the result somewhere and then sets a trio.Event. The code that wants the result can then wait on that event and process the result.

Something like this:

class ResultOf:
    scope:trio.CancelScope = None
    def __init__(self, p, a, k):
        self.evt = trio.Event()
        self.p = (p,a,k)

    async def run_in(self, nursery):
        self.scope = await nursery.start(self._wrap, p,a,k)

    async def _wrap(self, task_status):
        with trio.CancelScope() as sc:
            task_status.started(sc)
            try:
                p,a,k = self.p
                self.res = await p(*a,**k)
            except Exception as e:
                self.err = e
            except BaseException as e:
                # Basic rules for handling BaseException in Python:
                # - you never save them for "recycling" in a different context
                # - you always re-raise them
                self.err = RuntimeError("got killed")
                self.err.__cause__ = e
                raise
            else:
                self.err = None
            finally:
                self.evt.set()

    def cancel(self):
        self.scope.cancel()

    async def get(self):
        await self.evt.wait()
        if self.err is not None:
            raise self.err
        return self.res

async def result_of(n, p,*a,**k):
    res = ResultOf(p,a,k)
    await res.run_in(n)
...
async with trio.open_nursery() as n:
    promise = await result_of(n, foo, 2)
    ...
    assert 4 == await promise.get()

There are reasons this kind of wrapper is not part of the Trio core. Among others: what happens if your promise.get runs in a scope that gets cancelled, so the result isn't needed any more – should that cancellation propagate to foo or not?

Thus the real answer is probably to re-factor your code so that you no longer need this kind of thing in the first place.

More generally, when converting something to Trio, it's easier to start with sync code and just sprinkle async and await onto it than to start with an asyncio/promise/etc. base and try to remove or convert all those pesky tasks.

The structure of a language shapes your thinking. Promises and tasks are not part of the Structured Concurrency concept that Trio has been written to adhere to. Read https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ (written by Trio's main author) for more background.

0
Arthur Tacca On

One option is to use the ResultCapture class in aioresult. (Disclaimer: I wrote that library.) Your example would look like this:

import trio
from aioresult import ResultCapture

async def coro_a():
    print("executing coro a")
    return 'a'

async def coro_b(result_a):
    await result_a.wait_done()
    print("from coro_b: ", result_a.result())
    return 'b'

async def coro_c(result_a):
    await result_a.wait_done()
    print("from coro_c: ", result_a.result())
    return 'c'

async def main():
    async with trio.open_nursery() as nursery:
        result_a = ResultCapture.start_soon(nursery, coro_a)
        result_b = ResultCapture.start_soon(nursery, coro_b, result_a)
        result_c = ResultCapture.start_soon(nursery, coro_c, result_a)

    print("result b:", result_b.result())
    print("result c:", result_c.result())

trio.run(main)

This class is not especially different from the one in Matthias's answer, so in that sense this is a very similar answer. (One difference is that the exception handling in aioresult is a bit simpler. Another is that ResultCapture does not include its own CancelScopewould that be useful?)

I agree with Matthias's point that you should try to avoid building your control flow around tasks like this. Instead, when possible, wait for multiple routines by running them in a nursery and then accessing the result after nursery completes (as in the snippet just above), and wait for individual routines by awaiting them directly. For example, we could modify your example to await coro_a directly since the other routines both need to wait for it anyway:

async def main()
    async with trio.open_nursery() as nursery:
        result_a = await coro_a()
        result_b = ResultCapture.start_soon(nursery, coro_b, result_a)
        result_c = ResultCapture.start_soon(nursery, coro_c, result_a)

    print("result b:", result_b.result())
    print("result c:", result_c.result())

In this snippet, result_a is the actual return value of coro_a rather than a ResultCapture object.

However, while I think it's good to avoid ResultCapture.wait_done(), I do think it's very reasonable – even preferable – to return a value from your coroutine even if it's running in a nursery with start_soon() and then access that later (once the nursery is finished) with ResultCapture.result(). The usual trick to "return" a value from a background task in Trio is to pass in a mutable variable like a list or dictionary, but that feels less natural to me (and you can always form a list or dictionary of ResultCapture objects if that's useful).