Rabbitmq consume message to be non blocking with tornado

1.6k views Asked by At
class WSHandler(tornado.websocket.WebSocketHandler):
    clients = []
    def open(self, name):
        # WSHandler.clients.append(self)
        # liveWebSockets.add(self)
        self.id = name
        self.clients.append(self)
        # self.application.pc.add_event_listener(self)
        print 'new connection'



    def on_message(self, message):
        print 'message received:  %s' % message
        # Reverse Message and send it back
        print 'sending back message: %s' % message[::-1]


        # pika sending message
        import pika
        connection = pika.BlockingConnection(pika.ConnectionParameters(
                       'localhost'))
        channel = connection.channel()
        # clients.append(self)
        channel.queue_declare(queue='hello')
        # print dir(self)
        message_rabbit_mq = {
                                'web_socket': self.id,
                                'message': message
                            }
        message_rabbit_mq = json.dumps(message_rabbit_mq)                    
        channel.basic_publish(exchange='',
                              routing_key='hello',
                              body=message_rabbit_mq)
        connection.close()


        self.rabbit_connect()
        # def rabbit_connect():
        # pika receving message
        connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
        channel = connection.channel()

        channel.queue_declare(queue='hello')

        def callback(ch, method, properties, body):
            print(" [x] Received %r" % body)
            self.write_message(body)
            time.sleep(4)
            body_obj =  json.loads(body)
            if 'message' in body:
                if body_obj['message'] == "crack":
                    channel.stop_consuming()

        channel.basic_consume(callback,
                        queue='hello',
                        no_ack=True)

        channel.start_consuming()

        self.write_message("closed reference")

The problem in the above code is

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        self.write_message(body)
        time.sleep(4)
        body_obj =  json.loads(body)
        if 'message' in body:
            if body_obj['message'] == "crack":
                channel.stop_consuming()

    channel.basic_consume(callback,
                    queue='hello',
                    no_ack=True)

    channel.start_consuming()

the above part blocks the rest of the logic in on_message function. how do i get the above part run async with the rest of the logic? which makes further websocket messages from client not process through.

1

There are 1 answers

0
Gabriele Santomaggio On BEST ANSWER

Try using this code:

https://github.com/Gsantomaggio/rabbitmqexample/tree/master/webSocketPython

def threaded_rmq(): channel.queue_declare(queue="my_queue") logging.info('consumer ready, on my_queue') channel.basic_consume(consumer_callback, queue="my_queue", no_ack=True) channel.start_consuming()

and if __name__ == "__main__": logging.info('Starting thread RabbitMQ') threadRMQ = Thread(target=threaded_rmq) threadRMQ.start()