RabbitMQ and GCP Pub/Sub integration

225 views Asked by At

I am working on integration RabbitMQ and GCP Pub/Sub. I am using Pika library with Python. Connection is established by BlockingConnection.
My consumer is running as separate thread. Max expected messages workload is up to 100 msg/s.

According some initial tests what I performed seems that solution is working, but I have some doubts how callback function is constructed. I am invoking publish method to publish to Pub/Sub in GCP and then in block try, basic_ack on processed message.

If somebody has experience in this area, could I ask to give some opinion about my solution and may be some examples how it could be implemented.

    def consume_data_callback(self, basic_deliver, body):
        
        # ... some code

        future = self.publisher.publish(topic_path, self.payload.SerializeToString())
        try:
            message_id = future.result(timeout=1)
            self.channel.basic_ack(basic_deliver.delivery_tag)
        except Exception as e:
            future.cancel()
            _logger.error("Result after publishing Pub/Sub with: {}".format(e)) 

Thank you for answer.

1

There are 1 answers

0
Mel On

I personally haven't tried it yet but I did searched for references that might help you. This RabbitMQ - Publish/Subscribe article has a callback code sample that can help you confirm if what you have written is an appropriate solution.

GitHub full code version:

#!/usr/bin/env python
import os
import pika
import sys


def main():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs', exchange_type='fanout')

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='logs', queue=queue_name)

    def callback(ch, method, properties, body):
        print(f" [x] {body.decode()}")

    print(' [*] Waiting for logs. To exit press CTRL+C')
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)

    channel.start_consuming()


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)