My app follows a Producer and Consumer pattern. There's a Producer, 2 task (A, B), and a Consumer.
Producer reads an sql table and outputs to A & B. They in turn performs some task on that output and sends to Consumer. Consumer reads from A & B and then outputs to an s3 file.
There is a memory channel between Producer & A, Producer & B, A & Consumer, B & Consumer.
This is how I terminate my program right now (once producers have exhausted all the rows in the SQL table):
async with trio.open_nursery() as nursery:
nursery.start_soon(A.run)
nursery.start_soon(B.run)
nursery.start_soon(consumer.run)
while True:
rowcount = await producer_task.run()
if not rowcount:
logging.info('Producer exiting loop')
# Terminate the tasks' inner loops
for t in (A, B, consumer):
t.is_terminated = True
# Let subtasks wrap up
await trio.sleep(60 * 5)
# Terminate all send_channels, subtasks can be stuck receiving.
for channel in all_channels.keys():
await channel.aclose()
break
This is the base class of A & B:
class AsyncSubtask(object):
def __init__(self, receive_channel, send_channel):
self.receive_channel = receive_channel
self.send_channel = send_channel
self.is_terminated = False
async def run(self):
try:
while not self.is_terminated:
input_work = await self.receive_channel.receive()
if input_work:
output_work = await self.loop(input_work)
await self.send_channel.send(output_work)
logging.info(f'{self.__class__.__name__} -> {self.get_logging_name(output_work)}')
else:
logging.warning(f'{self.__class__.__name__} received empty inputs.')
except trio.EndOfChannel:
pass
logging.info(f'{self.__class__.__name__} exiting loop')
async def loop(self, work):
raise NotImplementedError
def get_logging_name(self, output_work):
return len(output_work)
Right now, my program is not exiting successfully due to this error:
Traceback (most recent call last):
File "/myfile/bin/fetch_ott_features.py", line 386, in <module>
trio.run(parent)
File "/myfile/lib/python3.6/site-packages/trio/_core/_run.py", line 1896, in run
raise runner.main_task_outcome.error
File "/myfile/bin/fetch_ott_features.py", line 379, in parent
break
File "/myfile/lib/python3.6/site-packages/trio/_core/_run.py", line 741, in __aexit__
raise combined_error_from_nursery
File "/myfile/lib/python3.6/site-packages/a9_ifs_user_reach/async_util.py", line 27, in run
await self.send_channel.send(output_work)
File "/myfile/lib/python3.6/site-packages/trio/_channel.py", line 178, in send
await trio.lowlevel.wait_task_rescheduled(abort_fn)
File "/myfile/lib/python3.6/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
File "/myfile/lib/python3.6/site-packages/outcome/_sync.py", line 111, in unwrap
raise captured_error
trio.BrokenResourceError
Note: the break
in line 379 is referencing to the last line in the async with trio.open_nursery() as nursery
block above.
It seems the way I am terminating my program is causing this issue. I have ran this on two separate occasions and gotten the same error.
How should I terminate my program without causing this error?
The traceback says that the
BrokenResourceError
is coming from a call toawait send_channel.send(...)
.send_channel.send
raises this error if you're trying to send some data and the receiving end of the channel was already closed.I suspect the issue is that when do
...you're actually closing all of the channels, including one that's still in use.
If you have data flowing from Producer -> A/B -> Consumer, then the usual pattern for handling shutdown would be:
async for blah in receive_channel: ...
, then the loop will terminate once everything is done. If you're callingreceive_channel.receive()
, then you'll get anEndOfChannel
exception that you can catch.tl;dr: If you write each of your tasks like:
...then it should all clean itself up and terminate automatically and reliably.