My app follows a Producer and Consumer pattern. There's a Producer, 2 task (A, B), and a Consumer.
Producer reads an sql table and outputs to A & B. They in turn performs some task on that output and sends to Consumer. Consumer reads from A & B and then outputs to an s3 file.
There is a memory channel between Producer & A, Producer & B, A & Consumer, B & Consumer.
This is how I terminate my program right now (once producers have exhausted all the rows in the SQL table):
async with trio.open_nursery() as nursery:
    nursery.start_soon(A.run)
    nursery.start_soon(B.run)
    nursery.start_soon(consumer.run)
    
    while True:
        rowcount = await producer_task.run()
        if not rowcount:
            logging.info('Producer exiting loop')
            # Terminate the tasks' inner loops
            for t in (A, B, consumer):
                t.is_terminated = True
            # Let subtasks wrap up
            await trio.sleep(60 * 5)
            # Terminate all send_channels, subtasks can be stuck receiving.
            for channel in all_channels.keys():
                await channel.aclose()
            break
This is the base class of A & B:
class AsyncSubtask(object):
    def __init__(self, receive_channel, send_channel):
        self.receive_channel = receive_channel
        self.send_channel = send_channel
        self.is_terminated = False
    async def run(self):
        try:
            while not self.is_terminated:
                input_work = await self.receive_channel.receive()
                if input_work:
                    output_work = await self.loop(input_work)
                    await self.send_channel.send(output_work)
                    logging.info(f'{self.__class__.__name__} -> {self.get_logging_name(output_work)}')
                else:
                    logging.warning(f'{self.__class__.__name__} received empty inputs.')
        except trio.EndOfChannel:
            pass
        logging.info(f'{self.__class__.__name__} exiting loop')
    async def loop(self, work):
        raise NotImplementedError
    def get_logging_name(self, output_work):
        return len(output_work)
Right now, my program is not exiting successfully due to this error:
Traceback (most recent call last):
  File "/myfile/bin/fetch_ott_features.py", line 386, in <module>
    trio.run(parent)
  File "/myfile/lib/python3.6/site-packages/trio/_core/_run.py", line 1896, in run
    raise runner.main_task_outcome.error
  File "/myfile/bin/fetch_ott_features.py", line 379, in parent
    break
  File "/myfile/lib/python3.6/site-packages/trio/_core/_run.py", line 741, in __aexit__
    raise combined_error_from_nursery
  File "/myfile/lib/python3.6/site-packages/a9_ifs_user_reach/async_util.py", line 27, in run
    await self.send_channel.send(output_work)
  File "/myfile/lib/python3.6/site-packages/trio/_channel.py", line 178, in send
    await trio.lowlevel.wait_task_rescheduled(abort_fn)
  File "/myfile/lib/python3.6/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
    return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
  File "/myfile/lib/python3.6/site-packages/outcome/_sync.py", line 111, in unwrap
    raise captured_error
trio.BrokenResourceError
Note: the break in line 379 is referencing to the last line in the async with trio.open_nursery() as nursery block above.
It seems the way I am terminating my program is causing this issue. I have ran this on two separate occasions and gotten the same error.
How should I terminate my program without causing this error?
 
                        
The traceback says that the
BrokenResourceErroris coming from a call toawait send_channel.send(...).send_channel.sendraises this error if you're trying to send some data and the receiving end of the channel was already closed.I suspect the issue is that when do
...you're actually closing all of the channels, including one that's still in use.
If you have data flowing from Producer -> A/B -> Consumer, then the usual pattern for handling shutdown would be:
async for blah in receive_channel: ..., then the loop will terminate once everything is done. If you're callingreceive_channel.receive(), then you'll get anEndOfChannelexception that you can catch.tl;dr: If you write each of your tasks like:
...then it should all clean itself up and terminate automatically and reliably.