Python Asyncio streaming API

2.1k views Asked by At

I'm looking for some 'beyond basic' guidance on usage patterns for the StreamReader and StreamWriter classes in the Python asyncio package.

I am attempting to build a stateful server with a custom protocol using protobuf. Should I be sub-classing the StreamReader and StreamWriter to manage the serialization from protobuf bytes? I could then provide a read_message function on the reader. I know I can copy the code from streams.start_server providing my own StreamReader, but how do I set my StreamWriter?

Any pointers or examples gratefully received.

2

There are 2 answers

0
MarkNS On

I found it relatively straightforward to subclass the asyncio.streams library classes.

The start_server function is lifted from the tcp server example:

@asyncio.coroutine
def start_server(self, loop):
    def factory():
        reader = QbpStreamReader()
        return QbpStreamReaderProtocol(reader, self._accept_client)

    logger.info("QbpServer starting at tcp://%s:%s", self.host, self.port)
    self.server = yield from loop.create_server(factory, self.host, self.port)

It was necessary to subclass StreamReaderProtocol in order to construct my own StreamWriter. Other than that this is the same as the library function.

class QbpStreamReaderProtocol(streams.StreamReaderProtocol):
    def connection_made(self, transport):
        self._stream_reader.set_transport(transport)
        if self._client_connected_cb is not None:
            self._stream_writer = QbpStreamWriter(transport, self,
                                                  self._stream_reader,
                                                  self._loop)
            res = self._client_connected_cb(self._stream_reader,
                                            self._stream_writer)
            if coroutines.iscoroutine(res):
                self._loop.create_task(res)

For outgoing messages:

class QbpStreamWriter(streams.StreamWriter):
    def write_msg(self, msg):
        # data = serialise message
        self.write(data)

And for incoming messages:

class QbpStreamReader(streams.StreamReader):
    @asyncio.coroutine
    def read_msg(self):
        data = yield from self.readexactly(header_length)
        # msg_type, msg_length = unpack header
        data = yield from self.readexactly(msg_length)
        return build_message(msg_type, data)

Hope it helps someone

1
Andrew Svetlov On

I suggest instead of deriving from StreamReader/StreamWriter invite your own class(es) with similar API. Say, I did it for aiozmq library: https://github.com/aio-libs/aiozmq/blob/master/aiozmq/stream.py