I have following server code:
import asyncio
import json
async def conn_handler(reader, writer):
addr = writer.get_extra_info('peername')
print(f"{addr} connected")
while True:
data_len = await reader.read(2)
data_len = int.from_bytes(data_len, byteorder="big")
if data_len == 0:
break
if data := await reader.readexactly(data_len):
msg = json.loads(data)
print(f"received: {msg}")
msg = json.dumps(msg)
msg_len = len(msg).to_bytes(2, byteorder="big")
writer.write(msg_len + msg.encode())
await writer.drain()
print(f"sent: {msg}")
print(f"{addr} closed")
async def start_server():
server = await asyncio.start_server(conn_handler, "0.0.0.0", 10999)
print(f'Serving on {server.sockets[0].getsockname()}')
async with server:
await server.serve_forever()
try:
asyncio.run(start_server())
except KeyboardInterrupt:
print("keyboard interrupt occured")
and following client code:
import asyncio
import json
import logging
import pprint
import uuid
outstanding_msgs = []
msgs_count = 0
async def send_msgs_loop(writer):
with open("msgs.json") as f:
msgs = json.load(f)
global msgs_count
msgs_count = len(msgs)
for msg in msgs:
msg["id"] = str(uuid.uuid4())
outstanding_msgs.append(msg["id"])
msg = json.dumps(msg)
msg_len = len(msg)
msg_len = msg_len.to_bytes(2, byteorder="big")
writer.write(msg_len + msg.encode())
await writer.drain()
print(f"sent: {msg}")
async def read_msgs_loop(reader):
received_count = 0
while True:
msg_len = await reader.read(2)
msg_len = int.from_bytes(msg_len, byteorder="big")
if msg_len == 0:
break
if msg := await reader.readexactly(msg_len):
msg = json.loads(msg)
if msg["id"] in outstanding_msgs:
received_count = received_count + 1
outstanding_msgs.remove(msg["id"])
print(f"received: {msg}")
if not outstanding_msgs and received_count == msgs_count:
print("all responses received")
break
async def start_client():
reader, writer = await asyncio.open_connection("localhost", 10999)
await asyncio.gather(send_msgs_loop(writer), read_msgs_loop(reader))
writer.close()
await writer.wait_closed()
if __name__ == '__main__':
asyncio.run(start_client(), debug=False)
I need some guidance how to implement graceful shutdown on a server upon KeyboardInterrupt.
The try except on asyncio.run works fine when there are no connected clients. However, if there is a connected client, I encounter exceptions in conn_handler. Typically, it looks like this:
future: <Task finished name='Task-5' coro=<conn_handler() done, defined at C:\Users\tsku1460\myPython\pycon_2024\async_server.py:4> exception=KeyboardInte
rrupt()>
Traceback (most recent call last):
File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 40, in <module>
asyncio.run(start_server())
File "C:\Program Files\Python310\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 636, in run_until_complete
self.run_forever()
File "C:\Program Files\Python310\lib\asyncio\windows_events.py", line 321, in run_forever
super().run_forever()
File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 603, in run_forever
self._run_once()
File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 1909, in _run_once
handle._run()
File "C:\Program Files\Python310\lib\asyncio\events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 25, in conn_handler
print(f"sent: {msg}")
KeyboardInterrupt
If I place try-except in conn_handler, then I need to press CTRL+C twice (once it is caught by conn_handler and once by main try-catch block around asyncio.run). If I propagate the exception from conn_handler like this:
except KeyboardInterrupt as e:
raise
Then i get this:
keyboard interrupt occured
Task exception was never retrieved
future: <Task finished name='Task-5' coro=<conn_handler() done, defined at C:\Users\tsku1460\myPython\pycon_2024\async_server.py:4> exception=KeyboardInte
rrupt()>
Traceback (most recent call last):
File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 41, in <module>
asyncio.run(start_server())
File "C:\Program Files\Python310\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 636, in run_until_complete
self.run_forever()
File "C:\Program Files\Python310\lib\asyncio\windows_events.py", line 321, in run_forever
super().run_forever()
File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 603, in run_forever
self._run_once()
File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 1909, in _run_once
handle._run()
File "C:\Program Files\Python310\lib\asyncio\events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "C:\Users\tsku1460\myPython\pycon_2024\async_server.py", line 18, in conn_handler
print(f"received: {msg}")
KeyboardInterrupt
So the exception was propagated to main try-catch block (we can see the text "keyboard interrupt occurred") but still something is wrong.
Here is sample content for msgs.json:
[
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"},
{"amount": 30, "card": "4444444442", "terminal": "EFG"},
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"},
{"amount": 30, "card": "4444444442", "terminal": "EFG"},
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"},
{"amount": 30, "card": "4444444442", "terminal": "EFG"},
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"},
{"amount": 30, "card": "4444444442", "terminal": "EFG"},
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"},
{"amount": 30, "card": "4444444442", "terminal": "EFG"},
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"},
{"amount": 30, "card": "4444444442", "terminal": "EFG"},
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"},
{"amount": 30, "card": "4444444442", "terminal": "EFG"},
{"amount": 10, "card": "1213212312", "terminal": "ABC"},
{"amount": 25, "card": "5555555552", "terminal": "CDE"}
]
In general can by any json array. This is just an short example. I am using a really large one.
I believe I put the puzzle together. I had to check how things work in asyncio under the hood. The first thing we have to realize is that unhandled exceptions are propagated from Task to a higher level. So it will be captured by main try-catch block around
asyncio.runNow check the
asyncio.runimplementation:If there is an exception in execution of
run_until_completethen all tasks are cancelled via_cancel_all_tasks(loop)And now the last piece of puzzle. The
_cancel_all_tasks:So now we have the source of
Task exception was never retrievedmessage.I found 2 ways how to handle this:
First one: Save the task on which the KeyboardInterrupt is captured to global variable and re-raise the exception so it will be propagated to to main try-cath block. All tasks will be canceled (will be ensured via _cancel_all_tasks in asyncio.run). The last thing you have to do is to retrieve the KeyboardInterrupt exception from task we saved previously.
Another option is to set custom exception handler. More about this here: https://superfastpython.com/asyncio-task-exception-was-never-retrieved/
Guys pls let me know if there is another (better) way to graceful shutdown for an asyncio socket server.