I would like to run an on message callback async in a pika basic_consume. Is that possible and how? We are already running an asyncio loop for other tasks and this consumer uses httpx with an async connection to call internal services.
Here is our current Consumer class:
class Consumer:
"""
https://www.devmashup.com/creating-a-rabbitmq-consumer-in-python/
"""
connection: AsyncioConnection
channel: Any
routing_key: str
def __init__(self, routing_key) -> None:
self.connection = self.__create_connection()
self.channel = self.connection.channel()
self.__create_exchange()
self.routing_key = routing_key
@staticmethod
def __create_connection():
credentials = PlainCredentials(
settings.mqtt_vhost_user, settings.mqtt_vhost_password
)
parameters = ConnectionParameters(
settings.mqtt_host, settings.mqtt_port, settings.mqtt_vhost, credentials
)
return AsyncioConnection(parameters)
def close_connection(self):
self.connection.close()
def __create_exchange(self):
self.channel.exchange_declare(
exchange=settings.mqtt_exchange,
exchange_type=settings.mqtt_exchange_type,
passive=False,
durable=True,
auto_delete=False,
internal=False,
)
def consume(self, message_received_callback):
logger.info(f"Started consumer for {self.routing_key}")
self.channel.queue_declare(
queue=self.routing_key,
passive=False,
durable=True,
exclusive=False,
auto_delete=False,
)
self.channel.queue_bind(
queue=self.routing_key,
exchange=settings.mqtt_exchange,
routing_key=self.routing_key,
)
async def consume_message(channel, method, properties, body):
await message_received_callback(body)
channel.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(
self.routing_key,
consume_message,
)
self.channel.start_consuming()
For async + rabbitmq, I would suggest you aiormq. It's async out of the box, it works smoothly with asyncio and the api is intuitive/similar when you already know pika.
Here is how you can create a simple consumer