asynchronous python itertools chain multiple generators

4.9k views Asked by At

UPDATED QUESTION FOR CLARITY:

suppose I have 2 processing generator functions:

def gen1(): # just for examples,
  yield 1   # yields actually carry 
  yield 2   # different computation weight 
  yield 3   # in my case

def gen2():
  yield 4
  yield 5
  yield 6

I can chain them with itertools

from itertools import chain

mix = chain(gen1(), gen2())

and then I can create another generator function object with it,

def mix_yield():
   for item in mix:
      yield item

or simply if I just want to next(mix), it's there.

My question is, how can I do the equivalent in asynchronous code?

Because I need it to:

  • return in yield (one by one), or with next iterator
  • the fastest resolved yield first (async)

PREV. UPDATE:

After experimenting and researching, I found aiostream library which states as async version of itertools, so what I did:

import asyncio
from aiostream import stream

async def gen1(): 
     await asyncio.sleep(0) 
     yield 1 
     await asyncio.sleep(0) 
     yield 2 
     await asyncio.sleep(0) 
     yield 3 

async def gen2(): 
     await asyncio.sleep(0) 
     yield 4 
     await asyncio.sleep(0) 
     yield 5 
     await asyncio.sleep(0) 
     yield 6 

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
   for item in a_mix:
      yield item

but I still can't do next(a_mix)

TypeError: 'merge' object is not an iterator

or next(await a_mix)

raise StreamEmpty()

Although I still can make it into a list:

print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]

so one goal is completed, one more to go:

  • return in yield (one by one), or with next iterator

    - the fastest resolved yield first (async)

3

There are 3 answers

5
user4815162342 On BEST ANSWER

Python's next built-in function is just a convenient way of invoking the underlying __next__ method on the object. The async equivalent of __next__ is the __anext__ method on the async iterator. There is no anext global function in the standard library (the aiostream library provides one), but one could easily write it:

async def anext(aiterator):
    return await aiterator.__anext__()

But the savings is so small that, in rare situations when this is needed, one may as well invoke __anext__ directly. The async iterator is in turn obtained from an async iterable by calling the __aiter__ (in analogy to __iter__ provided by regular iterables). Async iteration driven manually looks like this:

a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__()  # async method
elem2 = await a_iterator.__anext__()  # async method
...

__anext__ will raise StopAsyncIteration when no more elements are available. To loop over async iterators one should use async for.

Here is a runnable example, based on your code, using both __anext__ and async for to exhaust the stream set up with aiostream.stream.combine.merge:

async def main():
    a_mix = stream.combine.merge(gen1(), gen2())
    async with a_mix.stream() as streamer:
        mix_iter = streamer.__aiter__()    
        print(await mix_iter.__anext__())
        print(await mix_iter.__anext__())
        print('remaining:')
        async for x in mix_iter:
            print(x)

asyncio.get_event_loop().run_until_complete(main())
0
Mohit Kumar On

I came across this answer and I looked at the aiostream library. Here is the code I came up with to merge multiple async generators. It does not use any library.

async def merge_generators(gens:Set[AsyncGenerator[Any, None]]) -> AsyncGenerator[Any, None]:
    pending = gens.copy()
    pending_tasks = { asyncio.ensure_future(g.__anext__()): g for g in pending }
    while len(pending_tasks) > 0:
        done, _ = await asyncio.wait(pending_tasks.keys(), return_when="FIRST_COMPLETED")
        for d in done:
            try:
                result = d.result()
                yield result
                dg = pending_tasks[d]
                pending_tasks[asyncio.ensure_future(dg.__anext__())] = dg
            except StopAsyncIteration as sai:
                print("Exception in getting result", sai)
            finally:
                del pending_tasks[d]

Hope this helps you and let me know if there are any bugs in this.

0
Tiago Coutinho On

I've managed to merge and chain several async generators with these simple helpers.

Different error strategies can be applied so I choose not to show any error For clarity I don't show error handling as different

import asyncio


async def merge(*streams):
    n = len(streams)
    queue = asyncio.Queue()
    signal = object()
    async def enqueue(stream):
        async for event in stream:
            await queue.put(event)
        await queue.put(signal)
    tasks = [asyncio.create_task(enqueue(stream)) for stream in streams]
    while n > 0:
        event = await queue.get()
        if event is signal:
            n -= 1
        else:
            yield event
    await asyncio.wait(tasks)


async def chain(*streams):
    for stream in streams:
        async for item in stream:
            yield item

Example usage:

async def gen(name, n, nap):
    for i in range(n):
        await asyncio.sleep(nap)
        yield f"Event #{i} for {name}"


async def main():
    print("Merging 2 async generators")
    g1 = gen("task A", 3, 0.5)
    g2 = gen("task B", 6, 0.3)
    async for item in merge(g1, g2):
        print(f"  {item}")

    print("Chaining 2 async generators")
    g1 = gen("task A", 3, 0.5)
    g2 = gen("task B", 6, 0.3)
    async for item in chain(g1, g2):
        print(f"  {item}")