How to handle unexpected connection close in aio pika

108 views Asked by At

do you have any clues on how to handle an unexpected connection close of Rabbit? Pika handles it by itself and tries every 5 seconds to reconnect. I don't need such behavior; it should raise an exception that closes the service instead. I've been searching for an answer, but unfortunately, I haven't found anything.

class AMQPHandler:
    def __init__(self) -> None:
        self.connection: AbstractRobustConnection | None = None
        self.channel: AbstractChannel | None = None

    async def init(self) -> None:
        import settings

        # breakpoint()
        logger.info("Initializing AMQP handler")

        config = settings.BaseMessageBrokerSettings
        connection = await aio_pika.connect_robust(
            config.get_dsn(),
            loop=asyncio.get_event_loop(),
            timeout=config.CONNECTION_TIMEOUT,
        )
        channel = await connection.channel()

        await channel.set_qos(prefetch_count=10)

        exchange = await channel.declare_exchange(
            config.EXCHANGE_NAME,
            config.EXCHANGE_TYPE,
            auto_delete=config.EXCHANGE_AUTO_DELETE,
            durable=True,
        )
        for key in config.BINDING_KEYS:
            q_name = (
                f"{key}.{config.PREFIX_BINDING_KEYS}"
                if config.PREFIX_BINDING_KEYS
                else key
            )
            queue = await channel.declare_queue(name=q_name, durable=True)
            await queue.bind(exchange, q_name)
            await queue.consume(self.handle_message)
            logger.info("Queue declared", extra={"queue": q_name})

        self.connection = connection
        self.channel = channel

        logger.info("AMQP handler initialized")
0

There are 0 answers