add_callback with thread pool, but Exception about "Cannot write() after finish()."

2.1k views Asked by At

I'm using thread pool while using Tornado to do some work. This is the code:

common/thread_pool.py

import tornado.ioloop

class Worker(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self._queue = queue

    def run(self):
        logging.info('Worker start')
        while True:
            content = self._queue.get()
            if isinstance(content, str) and content == 'quit':
                break
            #content: (func, args, on_complete)
            func = content[0]
            args = content[1]
            on_complete = content[2]
            resp = func(args)
            tornado.ioloop.IOLoop.instance().add_callback(lambda: on_complete(resp))
            #i dont know is correct to call this
            #self._queue.task_done()
        logging.info('Worker stop')

class WorkerPool(object):
    _workers = []
    def __init__(self, num):
        self._queue = Queue.Queue()
        self._size = num

    def start(self):
        logging.info('WorkerPool start %d' % self._size)
        for _ in range(self._size):
            worker = Worker(self._queue)
            worker.start()
            self._workers.append(worker)

    def stop(self):
        for worker in self._workers:
            self._queue.put('quit') 
        for worker in self._workers:
            worker.join()
        logging.info('WorkerPool stopd')

    def append(self, content):
        self._queue.put(content)

gateway.py

import tornado.ioloop
import tornado.web

from common import thread_pool

workers = None

class MainServerHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        start_time = time.time()
        method = 'get'
        content = (self.handle, (method, self.request, start_time), self.on_complete)
        workers.append(content)

    @tornado.web.asynchronous
    def post(self):
        start_time = time.time()
        method = 'post'
        content = (self.handle, (method, self.request, start_time), self.on_complete)
        workers.append(content)

    def handle(self, args):
        method, request, start_time = args
        #for test, just return
        return 'test test'

    def on_complete(self, res):
        logging.debug('on_complete')
        self.write(res)
        self.finish()
        return        

def main(argv):  
    global workers
    workers = thread_pool.WorkerPool(conf_mgr.thread_num)
    workers.start()

    application = tornado.web.Application([(r"/", MainServerHandler)])
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    main(sys.argv[1:])

When I make many concurrent requests, I get this error:

ERROR: 2014-09-15 18:04:03: ioloop.py:435 * 140500107065056 Exception in callback <tornado.stack_context._StackContextWrapper object at 0x7fc8b4d6b9f0>

  Traceback (most recent call last):
     File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/../third-party/tornado-2.4.1/tornado/ioloop.py", line 421, in _run_callback
       callback()
     File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/../common/thread_pool.py", line 39, in <lambda>
       tornado.ioloop.IOLoop.instance().add_callback(lambda: on_complete(resp))
     File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/gateway.py", line 92, in on_complete
       self.write(res)
     File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/../third-party/tornado-2.4.1/tornado/web.py", line 489, in write
      raise RuntimeError("Cannot write() after finish().  May be caused "
  RuntimeError: Cannot write() after finish().  May be caused by using async operations without the @asynchronous decorator.

But I didn't call write after finish. I'm also using the @asynchronous decorator. At the same time, in the logs I see that write/finish is called by same thread.

1

There are 1 answers

3
dano On BEST ANSWER

The issue is with the way you're adding the callback to the I/O loop. Add it like this:

tornado.ioloop.IOLoop.instance().add_callback(on_complete, resp)

And the errors will go away.

You're seeing this strange behavior because when you use a lambda function, you're creating a closure in the local scope of the function, and the variables used in that closure get bound at the point the lambda is executed, not when its created. Consider this example:

funcs = []
def func(a):
    print a

for i in range(5):
   funcs.append(lambda: func(i))

for f in funcs:
    f()

Output:

4
4
4
4
4

Because your worker method is running in a while loop, on_complete ends up getting redefined several times, which also changes the value of on_complete inside the lambda. That means if one worker thread sets on_complete for a handler A, but then gets another task and sets on_complete for handler B prior to the callback set for handler A running, both callbacks end up up running handler B's on_complete.

If you really wanted to use a lambda, you could also avoid this by binding on_complete in the local scope of the lambda:

tornado.ioloop.IOLoop.instance().add_callback(lambda on_complete=on_complete: on_complete(resp))

But just adding the function and its argument directly is much nicer.