I have a tornado server which I am trying to make synchronous. I have a client which makes asynchronous requests to the server simultaneously. It pings the server every 5 seconds with a heartbeat and secondly, it makes a GET request for a job whenever it can.
On the server side, there is a thread-safe queue which contains jobs. It blocks for 20 seconds if the queue is empty. I want it to hold the connection and block for that 20 seconds and when it returns, it writes "No job" to the client. As soon as a job is available, it should immediately write it to the client since queue.get() would return. I want the heartbeats to continue happening in the background while this request is blocked. Here I am making two asynchronous requests to the server from the same client.
Here is a sample project I build which kind of simulates my issue.
Server:
import tornado.ioloop
import tornado.web
from queue import Queue
from tornado import gen
q = Queue()
class HeartBeatHandler(tornado.web.RequestHandler):
@gen.coroutine
def post(self):
print("Heart beat")
class JobHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
print("Job")
try:
job = yield q.get(block=True, timeout=20)
self.write(job)
except Exception as e:
self.write("No job")
def make_app():
return tornado.web.Application([
(r"/heartbeat", HeartBeatHandler),
(r"/job", JobHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
try:
tornado.ioloop.IOLoop.current().start()
except KeyboardInterrupt:
tornado.ioloop.IOLoop.current().stop()
Client:
import asyncio
from tornado import httpclient, gen
@gen.coroutine
def heartbeat_routine():
while True:
http_client = httpclient.AsyncHTTPClient()
heartbeat_request = httpclient.HTTPRequest("http://{}/heartbeat".format("127.0.0.1:8888"), method="POST",
body="")
try:
yield http_client.fetch(heartbeat_request)
yield asyncio.sleep(5)
except httpclient.HTTPError as e:
print("Heartbeat failed!\nError: {}".format(str(e)))
http_client.close()
@gen.coroutine
def worker_routine():
while True:
http_client = httpclient.AsyncHTTPClient(defaults=dict(request_timeout=180))
job_request = httpclient.HTTPRequest("http://{}/job".format("127.0.0.1:8888"), method="GET")
try:
response = yield http_client.fetch(job_request)
print(response.body)
except httpclient.HTTPError as e:
print("Heartbeat failed!\nError: {}".format(str(e)))
http_client.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
asyncio.ensure_future(heartbeat_routine())
asyncio.ensure_future(worker_routine())
loop.run_forever()
Questions:
- The problem is that the heartbeats also block for those 20 seconds while the queue.get() blocks. Which I do not want.
- As you can see in my client I set request timeout to 180 seconds. But that never seems to work with tornado. If you increase queue.get() timeout above 20 seconds, it returns error code saying request timed out.
If you use a thread-safe queue, you must use not use blocking operations from the IOLoop thread. Instead, run them in a thread pool:
Alternately, you could use Tornado's async (but thread-unsafe) queue, and use
IOLoop.add_callback
whenever you need to interact with the queue from another thread.There's some magic in the
AsyncHTTPClient
constructor, which tries to share existing instances when possible, but this means that constructor arguments are only effective the first time. Theworker_routine
is picking up the default instances created byheartbeat_routine
. Addforce_instance=True
to ensure you get a fresh client inworker_routine
(and call.close()
on it when you're done)