Using gevent.queue.Queue.get(): gevent.hub.LoopExit: 'This operation would block forever'

2.1k views Asked by At

I've been trying to integrate event streaming into my flask application for the past few days with good results on my local testing, but somewhat worse when running the application with uWSGI on my server. My code is basically built upon the example from flask. I'm using python 3.4.2.

The problem

When running the app on my uWSGI server, it raises gevent.hub.LoopExit: 'This operation would block forever'. whenever a client tries connecting to the /streaming endpoint. My assumption is that this is caused by calling get() on an empty queue indefinitely.

Full traceback:

Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/werkzeug/wsgi.py", line 691, in __next__
    return self._next()
  File "/usr/lib/python3/dist-packages/werkzeug/wrappers.py", line 81, in _iter_encoded
    for item in iterable:
  File "./voting/__init__.py", line 49, in gen
    result = queue.get(block=True)
  File "/usr/local/lib/python3.4/dist-packages/gevent/queue.py", line 284, in get
    return self.__get_or_peek(self._get, block, timeout)
  File "/usr/local/lib/python3.4/dist-packages/gevent/queue.py", line 261, in __get_or_peek
    result = waiter.get()
  File "/usr/local/lib/python3.4/dist-packages/gevent/hub.py", line 878, in get
    return self.hub.switch()
  File "/usr/local/lib/python3.4/dist-packages/gevent/hub.py", line 609, in switch
    return greenlet.switch(self)
gevent.hub.LoopExit: ('This operation would block forever', <Hub at 0x7f717f40f5a0 epoll default pending=0 ref=0 fileno=6>)

My code

The /streaming endpoint:

@app.route("/streaming", methods=["GET", "OPTIONS"])
def streaming():
    def gen():
        queue = Queue()
        subscriptions.add_subscription(session_id, queue)

        try:
            while True:
                result = queue.get()      # Where the Exception is raised
                ev = ServerSentEvent(json.dumps(result["data"]), result["type"])
                yield ev.encode()

        except GeneratorExit:  # TODO Need a better method to detect disconnecting
            subscriptions.remove_subscription(session_id, queue)

    return Response(gen(), mimetype="text/event-stream")

Adding an event to the queue:

def notify():
    msg = {"type": "users", "data": db_get_all_registered(session_id)}
    subscriptions.add_item(session_id, msg)      # Adds the item to the relevant queues.

gevent.spawn(notify)

As previously said, it runs fine locally with werkzeug:

from app import app
from gevent.wsgi import WSGIServer
from werkzeug.debug import DebuggedApplication

a = DebuggedApplication(app, evalex=True)
server = WSGIServer(("", 5000), a)
server.serve_forever()

What I've tried

  • Monkey-patching with monkey.patch_all().

  • Switching from Queue to JoinableQueue.

  • gevent.sleep(0) in combination with Queue.get().

1

There are 1 answers

1
Jason Madden On BEST ANSWER

That exception basically means that there are no other greenlets running in that loop/thread to switch to. So when the greenlet goes to block (queue.get()), the hub has nowhere else to go, nothing else to do.

The same code would work in gevent's WSGIServer because the server itself is a greenlet that's running the socket.accept loop, so there's always another greenlet to switch to. But apparently uwsgi doesn't work that way.

The way to fix this is to arrange for there to be other greenlets running. For example, instead of spawning a greenlet to notify on demand, arrange for such a greenlet to already be running and blocking on its own queue.