I'm trying to code a simple program based on Asyncio and a Publish/Subscribe design pattern implemented with ZeroMQ. The publisher has 2 coroutines; one that listens for incoming subscriptions, and another one that publishes the value (obtained via an HTTP request) to the subscriber. The subscriber subscribes to a specific parameter (the name of a city in this case), and waits for the value (the temperature in this city).
Here is my code:
publisher.py
#!/usr/bin/env python
import json
import aiohttp
import aiozmq
import asyncio
import zmq
class Publisher:
BIND_ADDRESS = 'tcp://*:10000'
def __init__(self):
self.stream = None
self.parameter = ""
@asyncio.coroutine
def main(self):
self.stream = yield from aiozmq.create_zmq_stream(zmq.XPUB, bind=Publisher.BIND_ADDRESS)
tasks = [
asyncio.async(self.subscriptions()),
asyncio.async(self.publish())]
print("before wait")
yield from asyncio.wait(tasks)
print("after wait")
@asyncio.coroutine
def subscriptions(self):
print("Entered subscriptions coroutine")
while True:
print("New iteration of subscriptions loop")
received = yield from self.stream.read()
first_byte = received[0][0]
self.parameter = received[0][-len(received[0])+1:].decode("utf-8")
# Subscribe request
if first_byte == 1:
print("subscription request received for parameter "+self.parameter)
# Unsubscribe request
elif first_byte == 0:
print("Unsubscription request received for parameter "+self.parameter)
@asyncio.coroutine
def publish(self):
print("Entered publish coroutine")
while True:
if self.parameter:
print("New iteration of publish loop")
# Make HTTP request
url = "http://api.openweathermap.org/data/2.5/weather?q="+self.parameter
response = yield from aiohttp.request('GET', url)
assert response.status == 200
content = yield from response.read()
# Decode JSON string
decoded_json = json.loads(content.decode())
# Get parameter value
value = decoded_json["main"]["temp"]
# Publish fetched values to subscribers
message = bytearray(self.parameter+":"+str(value),"utf-8")
print(message)
pack = [message]
print("before write")
yield from self.stream.write(pack)
print("after write")
yield from asyncio.sleep(10)
test = Publisher()
loop = asyncio.get_event_loop()
loop.run_until_complete(test.main())
subscriber.py
#!/usr/bin/env python
import zmq
class Subscriber:
XSUB_CONNECT = 'tcp://localhost:10000'
def __init__(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.XSUB)
self.socket.connect(Subscriber.XSUB_CONNECT)
def loop(self):
print(self.socket.recv())
self.socket.close()
def subscribe(self, parameter):
self.socket.send_string('\x01'+parameter)
print("Subscribed to parameter "+parameter)
def unsubscribe(self, parameter):
self.socket.send_string('\x00'+parameter)
print("Unsubscribed to parameter "+parameter)
test = Subscriber()
test.subscribe("London")
while True:
print(test.socket.recv())
And here is the output :
Subscriber side :
$ python3 subscriber.py
Subscribed to parameter London
b'London:288.15'
Publisher side :
$ python3 publisher.py
before wait
Entered subscriptions coroutine
New iteration of subscriptions loop
Entered publish coroutine
subscription request received for parameter London
New iteration of subscriptions loop
New iteration of publish loop
bytearray(b'London:288.15')
before write
And the program is stuck there.
As you can see, "before write"
appears in the output and the message is sent, but "after write"
doesn't appear. So, I figured that an exception was probably raised and caught somewhere in the self.stream.write(pack)
call stack.
If I send a KeyboardInterrupt
to the Publisher, here is what I get:
Traceback (most recent call last):
File "publisher.py", line 73, in <module>
loop.run_until_complete(test.main())
File "/usr/lib/python3.4/asyncio/base_events.py", line 304, in run_until_complete
self.run_forever()
File "/usr/lib/python3.4/asyncio/base_events.py", line 276, in run_forever
self._run_once()
File "/usr/lib/python3.4/asyncio/base_events.py", line 1136, in _run_once
event_list = self._selector.select(timeout)
File "/usr/lib/python3.4/selectors.py", line 432, in select
fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
Task exception was never retrieved
future: <Task finished coro=<publish() done, defined at publisher.py:43> exception=TypeError("'NoneType' object is not iterable",)>
Traceback (most recent call last):
File "/usr/lib/python3.4/asyncio/tasks.py", line 236, in _step
result = coro.send(value)
File "publisher.py", line 66, in publish
yield from self.stream.write(pack)
TypeError: 'NoneType' object is not iterable
Task was destroyed but it is pending!
task: <Task pending coro=<subscriptions() running at publisher.py:32> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>
So I guess my problem actually is this error: TypeError: 'NoneType' object is not iterable
, but I have no clue what's causing it.
What is going wrong here?
The issue is that you're trying to
yield from
the call toself.stream.write()
, butstream.write
isn't actually a coroutine. When you callyield from
on an item, Python internally callsiter(item)
. In this case, the call towrite()
is returningNone
, so Python is trying to doiter(None)
- hence the exception you see.To fix it, you should just call
write()
like a normal function. If you want to actually wait until thewrite
is flushed and sent to the reader, useyield from stream.drain()
after you make the call towrite()
:Also, to make sure that exception in
publish
get raised without needing to Ctrl+C, useasyncio.gather
instead ofasyncio.wait
:With
asyncio.gather
, any exception thrown by a task insidetasks
will be re-raised.