class WSHandler(tornado.websocket.WebSocketHandler):
clients = []
def open(self, name):
# WSHandler.clients.append(self)
# liveWebSockets.add(self)
self.id = name
self.clients.append(self)
# self.application.pc.add_event_listener(self)
print 'new connection'
def on_message(self, message):
print 'message received: %s' % message
# Reverse Message and send it back
print 'sending back message: %s' % message[::-1]
# pika sending message
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
# clients.append(self)
channel.queue_declare(queue='hello')
# print dir(self)
message_rabbit_mq = {
'web_socket': self.id,
'message': message
}
message_rabbit_mq = json.dumps(message_rabbit_mq)
channel.basic_publish(exchange='',
routing_key='hello',
body=message_rabbit_mq)
connection.close()
self.rabbit_connect()
# def rabbit_connect():
# pika receving message
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
self.write_message(body)
time.sleep(4)
body_obj = json.loads(body)
if 'message' in body:
if body_obj['message'] == "crack":
channel.stop_consuming()
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
self.write_message("closed reference")
The problem in the above code is
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
self.write_message(body)
time.sleep(4)
body_obj = json.loads(body)
if 'message' in body:
if body_obj['message'] == "crack":
channel.stop_consuming()
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
the above part blocks the rest of the logic in on_message function. how do i get the above part run async with the rest of the logic? which makes further websocket messages from client not process through.
Try using this code:
https://github.com/Gsantomaggio/rabbitmqexample/tree/master/webSocketPython
def threaded_rmq(): channel.queue_declare(queue="my_queue") logging.info('consumer ready, on my_queue') channel.basic_consume(consumer_callback, queue="my_queue", no_ack=True) channel.start_consuming()
and
if __name__ == "__main__": logging.info('Starting thread RabbitMQ') threadRMQ = Thread(target=threaded_rmq) threadRMQ.start()