How do I run pyzmq and a webserver in one ioloop?

628 views Asked by At

I want to write a single threaded program that hosts a webserver using Tornado and also receive messages on a ZMQ socket (using PyZMQ Tornado event loop: http://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/multisocket/tornadoeventloop.html), but I'm not sure how to structure it. Should I be using

from zmq.eventloop import ioloop

or

from tornado.ioloop import IOLoop

or both?

2

There are 2 answers

0
vladkens On BEST ANSWER

Before all Tornado imports you need import zmq.eventloop.ioloop and call zmq.eventloop.ioloop.install function. Then you may import Tornado ioloop and use it.

See: http://zeromq.github.io/pyzmq/eventloop.html

0
Dhiraj Dhule On

Here is an example with Tornado HTTP server with zeroMQ PUB SUB sockets.

#!/usr/bin/env python
import json
import tornado
import tornado.web
import zmq
from tornado import httpserver
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream

ioloop.install()
tornado.ioloop = ioloop
import sys


def ping_remote():
    """callback to keep the connection with remote server alive while we wait
    Network routers between raspberry pie and cloud server will close the socket
    if there is no data exchanged for long time.
    """
    pub_inst.send_json_data(msg="Ping", req_id="##")
    sys.stdout.write('.')
    sys.stdout.flush()


pending_requests = {}


class ZMQSub(object):
    def __init__(self, callback):
        self.callback = callback
        context = zmq.Context()
        socket = context.socket(zmq.SUB)
        # socket.connect('tcp://127.0.0.1:5559')
        socket.bind('tcp://*:8081')
        self.stream = ZMQStream(socket)
        self.stream.on_recv(self.callback)
        socket.setsockopt(zmq.SUBSCRIBE, "")

    def shutdown_zmq_sub(self):
        self.stream.close()


class ZMQPub(object):
    def __init__(self):
        context = zmq.Context()
        socket = context.socket(zmq.PUB)
        socket.bind('tcp://*:8082')
        self.publish_stream = ZMQStream(socket)

    def send_json_data(self, msg, req_id):
        topic = str(req_id)
        self.publish_stream.send_multipart([topic, msg])

    def shutdown_zmq_sub(self):
        self.publish_stream.close()


def SensorCb(msg):
    # decode message from raspberry pie and the channel ID.
    key, msg = (i for i in msg)
    if not key == "##":
        msg = json.loads(msg)

        if key in pending_requests.keys():
            req_inst = pending_requests[key]
            req_inst.write(msg)
            req_inst.finish()
            del pending_requests[key]
        else:
            print "no such request"
            print pending_requests
    else:
        print "received ping"


class Handler(tornado.web.RequestHandler):
    def __init__(self, *args, **kwargs):
        super(Handler, self).__init__(*args, **kwargs)

        # get the unique req id
        self.req_id = str(self.application.req_id) + "#"
        self.application.req_id += 1

        # set headers
        self.set_header("Access-Control-Allow-Origin", "*")
        self.set_header("Access-Control-Allow-Headers", "x-requested-with")
        self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS, PUT')

    @tornado.web.asynchronous
    def get(self):
        print self.request
        if self.req_id not in pending_requests.keys():
            pending_requests[self.req_id] = self
        else:
            print "WTF"
        pub_inst.send_json_data(msg=json.dumps({"op": "ServiceCall"}), req_id=self.req_id)


if __name__ == "__main__":
    pub_inst = ZMQPub()
    sub_inst = ZMQSub(callback=SensorCb)
    application = tornado.web.Application(
        [(r'/get_sensor_data', Handler), (r'/(.*)')])
    application.req_id = 0
    server = httpserver.HTTPServer(application, )
    port = 8080
    server.listen(port)
    print "Sensor server ready on port: ", port
    ping = ioloop.PeriodicCallback(ping_remote, 3000)
    ping.start()
    tornado.ioloop.IOLoop.instance().start()