I have a python code that connects to RMQ and consumes messages. I just noticed that when the RMQ goes down my code just hangs at the close connection (inside aioamqp library) and stuck there and the only solution is to restart my code.
I added different exceptions but non solved my problem.
My code:
def main(*args, **kwargs):
log.info('main args: %s', kwargs)
try:
close_event=asyncio.Event()
event_loop = asyncio.get_event_loop()
event_loop.create_task(sim_server.run())
#launch rpc server and wait till finish
event_loop.run_until_complete(rpc_server(**kwargs, on_close=close_event))
log.info("after event_loop.run_until_complete")
except ProcessorException as e:
log.error('Processor exception: %s', e)
except Exception as e:
log.exception('Unexpected exception: %s', e)
except KeyboardInterrupt:
pass
finally:
log.info("inside finally")
#sent event to stop rpc server
close_event.set()
#block till all tasks are done
event_loop.run_forever()
event_loop.close()
async def rpc_server(url: str, exchange:str , queue_name: str, service_id: str, on_close: asyncio.Event):
log.info('amqp: %s, exchange: %s, queue_name: %s, service_id: %s', url, exchange, queue_name, service_id)
#aioamqp expects the virtualhost with the /. This is not accourding to the AMQP spec
if '%2f' in url:
url=url.replace('%2f', '/')
o=urlparse(url)
backoff=1.0
max_attempts=5
retry_interval=1
while True: #reconnection loop
try:
transport, protocol = await aioamqp.connect(host=o.hostname, port=o.port,
login=o.username, password=o.password,
virtualhost=o.path[1:])
channel = await protocol.channel()
await channel.queue_declare(queue_name=queue_name)
#bind to shared queue
await channel.queue_bind(queue_name=queue_name, exchange_name=exchange, routing_key=queue_name)
#bind to unique queue
await channel.queue_bind(queue_name=queue_name, exchange_name=exchange, routing_key=service_id)
await channel.basic_consume(on_request, queue_name=queue_name)
log.info('Awaiting RPC requests')
#wait till on_close event
await on_close.wait()
log.info('close amqp connection')
await protocol.close()
transport.close()
except (OSError, aioamqp.exceptions.AmqpClosedConnection) as e: #AMQP server not running or starting up
log.error('AMQP connection error: %s', e)
if not max_attempts:
raise ProcessorException('AMPQ error: max attempt reached: %s', e)
log.info('retry in %s seconds', backoff)
await asyncio.sleep(backoff)
max_attempts-=1
backoff*=2
except ConnectionResetError as e:
log.error('Connection reset by broker: %s', e)
log.info('Retrying indefinitely until the broker is up...')
await asyncio.sleep(retry_interval) # Adjust the retry interval as needed
except Exception as e:
log.error('Unexpected exception: %s', e)
import traceback
traceback.print_exc()
else:
log.info("inside break")
break
log.info("after while")
Log:
2024-02-20 12:27:39,835 ~~INFO ~~__main__ ~~Awaiting RPC requests ~~[processor.py:376]
2024-02-20 12:27:50,768 ~~WARNING ~~aioamqp.protocol ~~Connection lost exc=ConnectionResetError(104, 'Connection reset by peer') ~~[protocol.py:115]
2024-02-20 12:27:50,769 ~~INFO ~~aioamqp.protocol ~~Close connection ~~[protocol.py:314]
Thats it, that is all the logs that I get after stopping/killing the RMQ. The aioamqp library is getting stuck and I don't know how to restart the connection.