How to reconnect to RabbitMQ after the RMQ goes down

37 views Asked by At

I have a python code that connects to RMQ and consumes messages. I just noticed that when the RMQ goes down my code just hangs at the close connection (inside aioamqp library) and stuck there and the only solution is to restart my code.

I added different exceptions but non solved my problem.

My code:

def main(*args, **kwargs):
  log.info('main args: %s', kwargs)
  
  try:
    close_event=asyncio.Event()
    event_loop = asyncio.get_event_loop()
    event_loop.create_task(sim_server.run())
    #launch rpc server and wait till finish
    event_loop.run_until_complete(rpc_server(**kwargs, on_close=close_event))
    log.info("after event_loop.run_until_complete")
  except ProcessorException as e:
    log.error('Processor exception: %s', e)
  except Exception as e:
    log.exception('Unexpected exception: %s', e)
  except KeyboardInterrupt:
    pass
  finally:
    log.info("inside finally")
    #sent event to stop rpc server
    close_event.set()
    #block till all tasks are done
    event_loop.run_forever()
    event_loop.close()


async def rpc_server(url: str, exchange:str , queue_name: str, service_id: str,  on_close: asyncio.Event):

  log.info('amqp: %s, exchange: %s, queue_name: %s, service_id: %s', url, exchange, queue_name, service_id)
  #aioamqp expects the virtualhost with the /. This is not accourding to the AMQP spec
  if '%2f' in url:
    url=url.replace('%2f', '/')

  o=urlparse(url)    

  backoff=1.0
  max_attempts=5
  retry_interval=1

  while True: #reconnection loop
    try:
      transport, protocol = await aioamqp.connect(host=o.hostname, port=o.port, 
                                                  login=o.username, password=o.password,
                                                  virtualhost=o.path[1:])
      channel = await protocol.channel()
      await channel.queue_declare(queue_name=queue_name)
      #bind to shared queue
      await channel.queue_bind(queue_name=queue_name, exchange_name=exchange, routing_key=queue_name)
      #bind to unique queue
      await channel.queue_bind(queue_name=queue_name, exchange_name=exchange, routing_key=service_id)
      await channel.basic_consume(on_request, queue_name=queue_name)
      log.info('Awaiting RPC requests')
      #wait till on_close event
      await on_close.wait()
      log.info('close amqp connection')
      await protocol.close()
      transport.close()      
    except (OSError, aioamqp.exceptions.AmqpClosedConnection) as e: #AMQP server not running or starting up
      log.error('AMQP connection error: %s', e)
      if not max_attempts:
        raise ProcessorException('AMPQ error: max attempt reached: %s', e)

      log.info('retry in %s seconds', backoff)        
      await asyncio.sleep(backoff)
      max_attempts-=1
      backoff*=2
    except ConnectionResetError as e:
        log.error('Connection reset by broker: %s', e)
        log.info('Retrying indefinitely until the broker is up...')
        await asyncio.sleep(retry_interval)  # Adjust the retry interval as needed
    except Exception as e:
        log.error('Unexpected exception: %s', e)
        import traceback
        traceback.print_exc()
    else:
      log.info("inside break")
      break
  log.info("after while")

Log:

2024-02-20 12:27:39,835 ~~INFO  ~~__main__             ~~Awaiting RPC requests ~~[processor.py:376]
2024-02-20 12:27:50,768 ~~WARNING  ~~aioamqp.protocol     ~~Connection lost exc=ConnectionResetError(104, 'Connection reset by peer') ~~[protocol.py:115]
2024-02-20 12:27:50,769 ~~INFO  ~~aioamqp.protocol     ~~Close connection ~~[protocol.py:314]

Thats it, that is all the logs that I get after stopping/killing the RMQ. The aioamqp library is getting stuck and I don't know how to restart the connection.

0

There are 0 answers