I've been trying to integrate event streaming into my flask application for the past few days with good results on my local testing, but somewhat worse when running the application with uWSGI on my server. My code is basically built upon the example from flask. I'm using python 3.4.2
.
The problem
When running the app on my uWSGI server, it raises gevent.hub.LoopExit: 'This operation would block forever'.
whenever a client tries connecting to the /streaming
endpoint. My assumption is that this is caused by calling get()
on an empty queue indefinitely.
Full traceback:
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/werkzeug/wsgi.py", line 691, in __next__
return self._next()
File "/usr/lib/python3/dist-packages/werkzeug/wrappers.py", line 81, in _iter_encoded
for item in iterable:
File "./voting/__init__.py", line 49, in gen
result = queue.get(block=True)
File "/usr/local/lib/python3.4/dist-packages/gevent/queue.py", line 284, in get
return self.__get_or_peek(self._get, block, timeout)
File "/usr/local/lib/python3.4/dist-packages/gevent/queue.py", line 261, in __get_or_peek
result = waiter.get()
File "/usr/local/lib/python3.4/dist-packages/gevent/hub.py", line 878, in get
return self.hub.switch()
File "/usr/local/lib/python3.4/dist-packages/gevent/hub.py", line 609, in switch
return greenlet.switch(self)
gevent.hub.LoopExit: ('This operation would block forever', <Hub at 0x7f717f40f5a0 epoll default pending=0 ref=0 fileno=6>)
My code
The /streaming
endpoint:
@app.route("/streaming", methods=["GET", "OPTIONS"])
def streaming():
def gen():
queue = Queue()
subscriptions.add_subscription(session_id, queue)
try:
while True:
result = queue.get() # Where the Exception is raised
ev = ServerSentEvent(json.dumps(result["data"]), result["type"])
yield ev.encode()
except GeneratorExit: # TODO Need a better method to detect disconnecting
subscriptions.remove_subscription(session_id, queue)
return Response(gen(), mimetype="text/event-stream")
Adding an event to the queue:
def notify():
msg = {"type": "users", "data": db_get_all_registered(session_id)}
subscriptions.add_item(session_id, msg) # Adds the item to the relevant queues.
gevent.spawn(notify)
As previously said, it runs fine locally with werkzeug
:
from app import app
from gevent.wsgi import WSGIServer
from werkzeug.debug import DebuggedApplication
a = DebuggedApplication(app, evalex=True)
server = WSGIServer(("", 5000), a)
server.serve_forever()
What I've tried
Monkey-patching with
monkey.patch_all()
.Switching from
Queue
toJoinableQueue
.gevent.sleep(0)
in combination withQueue.get()
.
That exception basically means that there are no other greenlets running in that loop/thread to switch to. So when the greenlet goes to block (queue.get()), the hub has nowhere else to go, nothing else to do.
The same code would work in gevent's WSGIServer because the server itself is a greenlet that's running the socket.accept loop, so there's always another greenlet to switch to. But apparently uwsgi doesn't work that way.
The way to fix this is to arrange for there to be other greenlets running. For example, instead of spawning a greenlet to notify on demand, arrange for such a greenlet to already be running and blocking on its own queue.