I'm using thread pool while using Tornado to do some work. This is the code:
common/thread_pool.py
import tornado.ioloop
class Worker(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
logging.info('Worker start')
while True:
content = self._queue.get()
if isinstance(content, str) and content == 'quit':
break
#content: (func, args, on_complete)
func = content[0]
args = content[1]
on_complete = content[2]
resp = func(args)
tornado.ioloop.IOLoop.instance().add_callback(lambda: on_complete(resp))
#i dont know is correct to call this
#self._queue.task_done()
logging.info('Worker stop')
class WorkerPool(object):
_workers = []
def __init__(self, num):
self._queue = Queue.Queue()
self._size = num
def start(self):
logging.info('WorkerPool start %d' % self._size)
for _ in range(self._size):
worker = Worker(self._queue)
worker.start()
self._workers.append(worker)
def stop(self):
for worker in self._workers:
self._queue.put('quit')
for worker in self._workers:
worker.join()
logging.info('WorkerPool stopd')
def append(self, content):
self._queue.put(content)
gateway.py
import tornado.ioloop
import tornado.web
from common import thread_pool
workers = None
class MainServerHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
start_time = time.time()
method = 'get'
content = (self.handle, (method, self.request, start_time), self.on_complete)
workers.append(content)
@tornado.web.asynchronous
def post(self):
start_time = time.time()
method = 'post'
content = (self.handle, (method, self.request, start_time), self.on_complete)
workers.append(content)
def handle(self, args):
method, request, start_time = args
#for test, just return
return 'test test'
def on_complete(self, res):
logging.debug('on_complete')
self.write(res)
self.finish()
return
def main(argv):
global workers
workers = thread_pool.WorkerPool(conf_mgr.thread_num)
workers.start()
application = tornado.web.Application([(r"/", MainServerHandler)])
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
main(sys.argv[1:])
When I make many concurrent requests, I get this error:
ERROR: 2014-09-15 18:04:03: ioloop.py:435 * 140500107065056 Exception in callback <tornado.stack_context._StackContextWrapper object at 0x7fc8b4d6b9f0>
Traceback (most recent call last):
File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/../third-party/tornado-2.4.1/tornado/ioloop.py", line 421, in _run_callback
callback()
File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/../common/thread_pool.py", line 39, in <lambda>
tornado.ioloop.IOLoop.instance().add_callback(lambda: on_complete(resp))
File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/gateway.py", line 92, in on_complete
self.write(res)
File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/../third-party/tornado-2.4.1/tornado/web.py", line 489, in write
raise RuntimeError("Cannot write() after finish(). May be caused "
RuntimeError: Cannot write() after finish(). May be caused by using async operations without the @asynchronous decorator.
But I didn't call write
after finish
. I'm also using the @asynchronous
decorator. At the same time, in the logs I see that write
/finish
is called by same thread.
The issue is with the way you're adding the callback to the I/O loop. Add it like this:
And the errors will go away.
You're seeing this strange behavior because when you use a lambda function, you're creating a closure in the local scope of the function, and the variables used in that closure get bound at the point the lambda is executed, not when its created. Consider this example:
Output:
Because your worker method is running in a while loop,
on_complete
ends up getting redefined several times, which also changes the value ofon_complete
inside the lambda. That means if one worker thread setson_complete
for a handler A, but then gets another task and setson_complete
for handler B prior to the callback set for handler A running, both callbacks end up up running handler B'son_complete
.If you really wanted to use a lambda, you could also avoid this by binding
on_complete
in the local scope of the lambda:But just adding the function and its argument directly is much nicer.