How to run an async basic_consume in pika

1.2k views Asked by At

I would like to run an on message callback async in a pika basic_consume. Is that possible and how? We are already running an asyncio loop for other tasks and this consumer uses httpx with an async connection to call internal services.

Here is our current Consumer class:

class Consumer:
    """
    https://www.devmashup.com/creating-a-rabbitmq-consumer-in-python/
    """

    connection: AsyncioConnection
    channel: Any
    routing_key: str

    def __init__(self, routing_key) -> None:
        self.connection = self.__create_connection()
        self.channel = self.connection.channel()
        self.__create_exchange()
        self.routing_key = routing_key

    @staticmethod
    def __create_connection():
        credentials = PlainCredentials(
            settings.mqtt_vhost_user, settings.mqtt_vhost_password
        )

        parameters = ConnectionParameters(
            settings.mqtt_host, settings.mqtt_port, settings.mqtt_vhost, credentials
        )

        return AsyncioConnection(parameters)

    def close_connection(self):
        self.connection.close()

    def __create_exchange(self):
        self.channel.exchange_declare(
            exchange=settings.mqtt_exchange,
            exchange_type=settings.mqtt_exchange_type,
            passive=False,
            durable=True,
            auto_delete=False,
            internal=False,
        )

    def consume(self, message_received_callback):
        logger.info(f"Started consumer for {self.routing_key}")

        self.channel.queue_declare(
            queue=self.routing_key,
            passive=False,
            durable=True,
            exclusive=False,
            auto_delete=False,
        )

        self.channel.queue_bind(
            queue=self.routing_key,
            exchange=settings.mqtt_exchange,
            routing_key=self.routing_key,
        )

        async def consume_message(channel, method, properties, body):
            await message_received_callback(body)
            channel.basic_ack(delivery_tag=method.delivery_tag)

        self.channel.basic_consume(
            self.routing_key,
            consume_message,
        )

        self.channel.start_consuming()
1

There are 1 answers

0
Philippe On

For async + rabbitmq, I would suggest you aiormq. It's async out of the box, it works smoothly with asyncio and the api is intuitive/similar when you already know pika.

Here is how you can create a simple consumer