How to implement graceful shutdown for an asyncio socket server upon KeyboardInterrupt?

81 views Asked by At

I have following server code:

import asyncio
import json

async def conn_handler(reader, writer):

            addr = writer.get_extra_info('peername')
            print(f"{addr} connected")

            while True:
                    data_len = await reader.read(2)
                    data_len = int.from_bytes(data_len, byteorder="big")

                    if data_len == 0:
                        break

                    if data := await reader.readexactly(data_len):
                        msg = json.loads(data)
                        print(f"received: {msg}")
                        
                        msg = json.dumps(msg)                        
                        msg_len = len(msg).to_bytes(2, byteorder="big")

                        writer.write(msg_len + msg.encode())
                        await writer.drain()
                        print(f"sent: {msg}")

            print(f"{addr} closed")


async def start_server():
    server = await asyncio.start_server(conn_handler, "0.0.0.0", 10999)

    print(f'Serving on {server.sockets[0].getsockname()}')


    async with server:
        await server.serve_forever()

try:
    asyncio.run(start_server())
except KeyboardInterrupt:
    print("keyboard interrupt occured")

and following client code:

import asyncio
import json
import logging
import pprint
import uuid


outstanding_msgs = []
msgs_count = 0

async def send_msgs_loop(writer):
    with open("msgs.json") as f:
        msgs = json.load(f)

        global msgs_count
        msgs_count = len(msgs)

        for msg in msgs:

            msg["id"] = str(uuid.uuid4())
            outstanding_msgs.append(msg["id"])

            msg = json.dumps(msg)

            msg_len = len(msg)
            msg_len = msg_len.to_bytes(2, byteorder="big")

            writer.write(msg_len + msg.encode())
            await writer.drain()

            print(f"sent: {msg}")

async def read_msgs_loop(reader):
    received_count = 0
    while True:
        msg_len = await reader.read(2)
        msg_len = int.from_bytes(msg_len, byteorder="big")

        if msg_len == 0:
            break

        if msg := await reader.readexactly(msg_len):
            msg = json.loads(msg)
            if msg["id"] in outstanding_msgs:
                received_count = received_count + 1
                outstanding_msgs.remove(msg["id"])
                print(f"received: {msg}")

        if not outstanding_msgs and received_count == msgs_count:
            print("all responses received")
            break

async def start_client():
    reader, writer = await asyncio.open_connection("localhost", 10999)

    await asyncio.gather(send_msgs_loop(writer), read_msgs_loop(reader))

    writer.close()
    await writer.wait_closed()

if __name__ == '__main__':    
    asyncio.run(start_client(), debug=False)

I need some guidance how to implement graceful shutdown on a server upon KeyboardInterrupt.

The try except on asyncio.run works fine when there are no connected clients. However, if there is a connected client, I encounter exceptions in conn_handler. Typically, it looks like this:

future: <Task finished name='Task-5' coro=<conn_handler() done, defined at C:\Users\tsku1460\myPython\pycon_2024\async_server.py:4> exception=KeyboardInte
rrupt()>
Traceback (most recent call last):
  File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 40, in <module>
    asyncio.run(start_server())
  File "C:\Program Files\Python310\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 636, in run_until_complete
    self.run_forever()
  File "C:\Program Files\Python310\lib\asyncio\windows_events.py", line 321, in run_forever
    super().run_forever()
  File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 603, in run_forever
    self._run_once()
  File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 1909, in _run_once
    handle._run()
  File "C:\Program Files\Python310\lib\asyncio\events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 25, in conn_handler
    print(f"sent: {msg}")
KeyboardInterrupt

If I place try-except in conn_handler, then I need to press CTRL+C twice (once it is caught by conn_handler and once by main try-catch block around asyncio.run). If I propagate the exception from conn_handler like this:

   except KeyboardInterrupt as e:
        raise

Then i get this:

keyboard interrupt occured
Task exception was never retrieved
future: <Task finished name='Task-5' coro=<conn_handler() done, defined at C:\Users\tsku1460\myPython\pycon_2024\async_server.py:4> exception=KeyboardInte
rrupt()>
Traceback (most recent call last):
  File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 41, in <module>
    asyncio.run(start_server())
  File "C:\Program Files\Python310\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 636, in run_until_complete
    self.run_forever()
  File "C:\Program Files\Python310\lib\asyncio\windows_events.py", line 321, in run_forever
    super().run_forever()
  File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 603, in run_forever
    self._run_once()
  File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 1909, in _run_once
    handle._run()
  File "C:\Program Files\Python310\lib\asyncio\events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 18, in conn_handler
    print(f"received: {msg}")
KeyboardInterrupt

So the exception was propagated to main try-catch block (we can see the text "keyboard interrupt occurred") but still something is wrong.

Here is sample content for msgs.json:

[
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"},
        {"amount": 30, "card": "4444444442", "terminal": "EFG"},
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"},
        {"amount": 30, "card": "4444444442", "terminal": "EFG"},
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"},
        {"amount": 30, "card": "4444444442", "terminal": "EFG"},
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"},
        {"amount": 30, "card": "4444444442", "terminal": "EFG"},
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"},
        {"amount": 30, "card": "4444444442", "terminal": "EFG"},
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"},
        {"amount": 30, "card": "4444444442", "terminal": "EFG"},
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"},
        {"amount": 30, "card": "4444444442", "terminal": "EFG"},
        {"amount": 10, "card": "1213212312", "terminal": "ABC"},
        {"amount": 25, "card": "5555555552", "terminal": "CDE"}
]

In general can by any json array. This is just an short example. I am using a really large one.

2

There are 2 answers

0
coolio On

I believe I put the puzzle together. I had to check how things work in asyncio under the hood. The first thing we have to realize is that unhandled exceptions are propagated from Task to a higher level. So it will be captured by main try-catch block around asyncio.run

Now check the asyncio.run implementation:

def run(main, *, debug=None):
    """Execute the coroutine and return the result.

    This function runs the passed coroutine, taking care of
    managing the asyncio event loop and finalizing asynchronous
    generators.

    This function cannot be called when another asyncio event loop is
    running in the same thread.

    If debug is True, the event loop will be run in debug mode.

    This function always creates a new event loop and closes it at the end.
    It should be used as a main entry point for asyncio programs, and should
    ideally only be called once.

    Example:

        async def main():
            await asyncio.sleep(1)
            print('hello')

        asyncio.run(main())
    """
    if events._get_running_loop() is not None:
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")

    if not coroutines.iscoroutine(main):
        raise ValueError("a coroutine was expected, got {!r}".format(main))

    loop = events.new_event_loop()
    try:
        events.set_event_loop(loop)
        if debug is not None:
            loop.set_debug(debug)
        return loop.run_until_complete(main)
    finally:
        try:
            _cancel_all_tasks(loop)
            loop.run_until_complete(loop.shutdown_asyncgens())
            loop.run_until_complete(loop.shutdown_default_executor())
        finally:
            events.set_event_loop(None)
            loop.close()

If there is an exception in execution of run_until_complete then all tasks are cancelled via _cancel_all_tasks(loop)

And now the last piece of puzzle. The _cancel_all_tasks:

def _cancel_all_tasks(loop):
    to_cancel = tasks.all_tasks(loop)
    if not to_cancel:
        return

    for task in to_cancel:
        task.cancel()

    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))

    for task in to_cancel:
        if task.cancelled():
            continue
        if task.exception() is not None:
            loop.call_exception_handler({
                'message': 'unhandled exception during asyncio.run() shutdown',
                'exception': task.exception(),
                'task': task,
            })

So now we have the source of Task exception was never retrieved message.

I found 2 ways how to handle this:

First one: Save the task on which the KeyboardInterrupt is captured to global variable and re-raise the exception so it will be propagated to to main try-cath block. All tasks will be canceled (will be ensured via _cancel_all_tasks in asyncio.run). The last thing you have to do is to retrieve the KeyboardInterrupt exception from task we saved previously.

import asyncio
import json


exception_task = None

async def conn_handler(reader, writer):
    try:
        addr = writer.get_extra_info('peername')
        print(f"{addr} connected")

        while True:
            data_len = await reader.read(2)
            data_len = int.from_bytes(data_len, byteorder="big")

            if data_len == 0:
                break

            if data := await reader.readexactly(data_len):
                msg = json.loads(data)
                print(f"received: {msg}")

                msg = json.dumps(msg)
                msg_len = len(msg).to_bytes(2, byteorder="big")

                writer.write(msg_len + msg.encode())
                await writer.drain()
                print(f"sent: {msg}")

        print(f"{addr} closed")
    except KeyboardInterrupt:
        global exception_task
        exception_task = asyncio.current_task()
        raise


async def start_server():
    server = await asyncio.start_server(conn_handler, "0.0.0.0", 10999)

    print(f'Serving on {server.sockets[0].getsockname()}')

    async with server:
        await server.serve_forever()


try:
    asyncio.run(start_server())
except KeyboardInterrupt:
    if exception_task is not None:
        exception_task.exception()
        print("keyboard interrupt occured")

Another option is to set custom exception handler. More about this here: https://superfastpython.com/asyncio-task-exception-was-never-retrieved/

Guys pls let me know if there is another (better) way to graceful shutdown for an asyncio socket server.

0
Aaron On

Handling SIGINT / Ctrl + C was added to asyncio.run() in Python 3.11.

The general solution is to

  • use context manager async with/with if available, or

  • wrap the graceful shutdown around finally block. As you see, _cancel_all_tasks() cancels all tasks, and execute them again by raising asyncio.CancelledError inside. This allows properly releasing resources. (Assuming the user do not hit Ctrl+C again during this phase).

    async def start_client():
        try:
            reader, writer = await asyncio.open_connection("localhost", 10999)
    
            await asyncio.gather(send_msgs_loop(writer), read_msgs_loop(reader))
    
        finally:
            writer.close()
            await writer.wait_closed()
    

KeyboardInterrupt may raise anywhere. It may raise in user code so you have a chance to catch it. It may also raise in the asyncio code, which you are unable to catch its propagation until it reach outside asynco.run().

The change in Python 3.11 should avoid the warning message as well because CancelledError should be correctly raised instead of KeyboardInterrupt. The Python 3.11 runners.py may be backported for older versons.

Here an example of exception handler is given for demonstration.

def custom_handler(loop, context):
    exception = context.get("exception")
    if isinstance(exception, KeyboardInterrupt):
        # Ignore it
        return
    else:
        # Call the default exception handler
        loop.default_exception_handler(context)

async def main():
    # set exception handler inside asyncio.run() because the event loop is newly created.
    asyncio.get_running_loop().set_exception_handler(custom_handler)

    ...

asyncio.run(main())

Note: If Ctrl+C is hit twice or more, KeyboardInterrupt still raises somewhere in Python 3.11. This is like what happened in earlier versions.