issues working with python generators and openstack swift client

1.1k views Asked by At

I'm having a problem with Python generators while working with the Openstack Swift client library.

The problem at hand is that I am trying to retrieve a large string of data from a specific url (about 7MB), chunk the string into smaller bits, and send a generator class back, with each iteration holding a chunked bit of the string. in the test suite, this is just a string that's sent to a monkeypatched class of the swift client for processing.

The code in the monkeypatched class looks like this:

def monkeypatch_class(name, bases, namespace):
    '''Guido's monkeypatch metaclass.'''
    assert len(bases) == 1, "Exactly one base class required"
    base = bases[0]
    for name, value in namespace.iteritems():
        if name != "__metaclass__":
            setattr(base, name, value)
    return base

And in the test suite:

from swiftclient import client
import StringIO
import utils

class Connection(client.Connection):
    __metaclass__ = monkeypatch_class

    def get_object(self, path, obj, resp_chunk_size=None, ...):
        contents = None
        headers = {}

        # retrieve content from path and store it in 'contents'
        ...

        if resp_chunk_size is not None:
            # stream the string into chunks
            def _object_body():
                stream = StringIO.StringIO(contents)
                buf = stream.read(resp_chunk_size)
                while buf:
                    yield buf
                    buf = stream.read(resp_chunk_size)
            contents = _object_body()
        return headers, contents

After returning the generator object, it was called by a stream function in the storage class:

class SwiftStorage(Storage):

    def get_content(self, path, chunk_size=None):
        path = self._init_path(path)
        try:
            _, obj = self._connection.get_object(
                self._container,
                path,
                resp_chunk_size=chunk_size)
            return obj
        except Exception:
            raise IOError("Could not get content: {}".format(path))

    def stream_read(self, path):
        try:
            return self.get_content(path, chunk_size=self.buffer_size)
        except Exception:
            raise OSError(
                "Could not read content from stream: {}".format(path))

And finally, in my test suite:

def test_stream(self):
    filename = self.gen_random_string()
    # test 7MB
    content = self.gen_random_string(7 * 1024 * 1024)
    self._storage.stream_write(filename, io)
    io.close()
    # test read / write
    data = ''
    for buf in self._storage.stream_read(filename):
        data += buf
    self.assertEqual(content,
                     data,
                     "stream read failed. output: {}".format(data))

The output ends up with this:

======================================================================
FAIL: test_stream (test_swift_storage.TestSwiftStorage)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/bacongobbler/git/github.com/bacongobbler/docker-registry/test/test_local_storage.py", line 46, in test_stream
    "stream read failed. output: {}".format(data))
AssertionError: stream read failed. output: <generator object _object_body at 0x2a6bd20>

I tried isolating this with a simple python script that follows the same flow as the code above, which passed without issues:

def gen_num():
    def _object_body():
        for i in range(10000000):
            yield i
    return _object_body()

def get_num():
    return gen_num()

def stream_read():
    return get_num()

def main():
    num = 0
    for i in stream_read():
        num += i
    print num

if __name__ == '__main__':
    main()

Any help with this issue is greatly appreciated :)

1

There are 1 answers

3
Max Noel On BEST ANSWER

In your get_object method, you're assigning the return value of _object_body() to the contents variable. However, that variable is also the one that holds your actual data, and it's used early on in _object_body.

The problem is that _object_body is a generator function (it uses yield). Therefore, when you call it, it produces a generator object, but the code of the function doesn't start running until you iterate over that generator. Which means that when the function's code actually starts running (the for loop in _test_stream), it's long after you've reassigned contents = _object_body().

Your stream = StringIO(contents) therefore creates a StringIO object containing the generator object (hence your error message), not the data.

Here's a minimal reproduction case that illustrates the problem:

def foo():
    contents = "Hello!"

    def bar():
        print contents
        yield 1

    # Only create the generator. This line runs none of the code in bar.
    contents = bar()

    print "About to start running..."
    for i in contents:
        # Now we run the code in bar, but contents is now bound to 
        # the generator object. So this doesn't print "Hello!"
        pass