Python and Trio, where producers are consumers, how to exit gracefully when the job is done?

659 views Asked by At

I'm trying to make a simple web crawler using trio an asks. I use nursery to start a couple of crawlers at once, and memory channel to maintain a list of urls to visit.

Each crawler receives clones of both ends of that channel, so they can grab a url (via receive_channel), read it, find and add new urls to be visited (via send_channel).

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())


async def crawler(send_channel, receive_channel):
    async for url in receive_channel:  # I'm a consumer!
        content = await ...
        urls_found = ...
        for u in urls_found:
            await send_channel.send(u)  # I'm a producer too!

In this scenario the consumers are the producers. How to stop everything gracefully?

The conditions for shutting everything down are:

  • channel is empty
  • AND
  • all crawlers are stuck at the first for loop, waiting for the url to appear in receive_channel (which... won't happen anymore)

I tried with async with send_channel inside crawler() but could not find a good way to do it. I also tried to find some different approach (some memory-channel-bound worker pool, etc), no luck here as well.

2

There are 2 answers

2
Anders E. Andersen On BEST ANSWER

There are at least two problem here.

Firstly is your assumption about stopping when the channel is empty. Since you allocate the memory channel with a size of 0, it will always be empty. You are only able to hand off a url, if a crawler is ready to receive it.

This creates problem number two. If you ever find more urls than you have allocated crawlers, your application will deadlock.

The reason is, that since you wont be able to hand off all your found urls to a crawler, the crawler will never be ready to receive a new url to crawl, because it is stuck waiting for another crawler to take one of its urls.

This gets even worse, because assuming one of the other crawlers find new urls, they too will get stuck behind the crawler that is already waiting to hand off its urls and they will never be able to take one of the urls that are waiting to be processed.

Relevant portion of the documentation:

https://trio.readthedocs.io/en/stable/reference-core.html#buffering-in-channels

Assuming we fix that, where to go next?

You probably need to keep a list (set?) of all visited urls, to make sure you dont visit them again.

To actually figure out when to stop, instead of closing the channels, it is probably a lot easier to simply cancel the nursery.

Lets say we modify the main loop like this:

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    active_workers = trio.CapacityLimiter(3) # Number of workers
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            while True:
                await trio.sleep(1) # Give the workers a chance to start up.
                if active_workers.borrowed_tokens == 0 and send_channel.statistics().current_buffer_used == 0:
                    nursery.cancel_scope.cancel() # All done!

Now we need to modify the crawlers slightly, to pick up a token when active.

async def crawler(active_workers, send_channel, receive_channel):
    async for url in receive_channel:  # I'm a consumer!
        with active_workers:
            content = await ...
            urls_found = ...
            for u in urls_found:
                await send_channel.send(u)  # I'm a producer too!

Other things to consider -

You may want to use send_channel.send_noblock(u) in the crawler. Since you have an unbounded buffer, there is no chance of a WouldBlock exception, and the behaviour of not having a checkpoint trigger on every send might be desireable. That way you know for sure, that a particular url is fully processed and all new urls have been added, before other tasks get a chance to grab a new url, or the parent task get a chance to check if work is done.

3
Paweł Lis On

This is a solution I came up with when I tried to reorganize the problem:

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
 
    limit = trio.CapacityLimiter(3)

    async with send_channel:
        await send_channel.send(('https://start-url', send_channel.clone()))
    #HERE1

    async with trio.open_nursery() as nursery:
        async for url, send_channel in receive_channel:  #HERE3
            nursery.start(consumer, url, send_channel, limit)

async def crawler(url, send_channel, limit, task_status):
    async with limit, send_channel:
        content = await ...
        links = ...
        for link in links:
            await send_channel.send((link, send_channel.clone()))
    #HERE2

(I skipped skipping visited urls)

Here, there is no 3 long lived consumers, but there is at most 3 consumers whenever there is enough work for them.

At #HERE1 the send_channel is closed (because it was used as context manager), the only thing that's keeping the channel alive is a clone of it, inside that channel.

At #HERE2 the clone is also closed (because context manager). If the channel is empty, then that clone was the last thing keeping the channel alive. Channel dies, for loop ends (#HERE3).

UNLESS there were urls found, in which case they were added to the channel, together with more clones of send_channel that will keep the channel alive long enough to process those urls.

Both this and Anders E. Andersen's solutions feel hacky to me: one is using sleep and statistics(), the other creates clones of send_channel and puts them in the channel... feels like a software implementation of klein bottle to me. I will probably look for some other approaches.