multiprocessing - Pool.imap is consuming my iterator

606 views Asked by At

I have an extremely huge iterator returning massive amounts of data (file contents). Consuming the iterator hence effectively eats up all my RAM in seconds. Generally, pythons multiprocessing.Pool().imap(...) claims to iterate lazily. That means that it gets a value from the iterator, passes it on to the worker and then waits for the worker to finish. This would be exactly what I want.

However, for some reason it keeps on retrieving values from the iterator, even if the maximum number of workers are already running. This is my code:

class FileNameIterator(object): # Support class for TupleIterator
    def __init__(self,path):
        self.scanner = scandir.scandir(path)

    def __iter__(self):
        return self

    def next(self):
        while True:
            direntry = self.scanner.next()
            if os.path.isfile(direntry.path):
                return direntry.name

class ContentIterator(object): # Support class for TupleIterator
    def __init__(self,filenameiter,path):
        self.filenameiter = filenameiter
        self.path = path

    def __iter__(self):
        return self

    def next(self):
        print "<iter> reading content." # TODO: remove
        with open(self.path + "\\" + self.filenameiter.next()) as f:
            r = f.read()
            f.close()
        return r

class TupleIterator(object): # Basically izip with working __len__
    def __init__(self,path):
        self.fnameiter = FileNameIterator(path)
        self.cntiter = ContentIterator(FileNameIterator(path),path)
        self.path = path

    def __iter__(self):
        return self

    def next(self):
        return self.fnameiter.next(), self.cntiter.next()

    def __len__(self):
        return len([name for name in os.listdir(self.path) if os.path.isfile(os.path.join(self.path, name))])


pool = ThreadPool(12)      # Has same API as Pool(), but allows memory sharing
itr = TupleIterator(_path) # Basically izip with working __len__
with open(_datafile, 'w') as datafile: # Workers write results to a file
    pool.imap_unordered(worker, itr,len(itr)/12) # This does not work as expected
    pool.close()
    pool.join()
    datafile.close()

I have the workers printing a message when they start and finish, and the iterator printing when it reads a file. This shows that the iterator continuously reads files way faster than the workers can process them.

How do I fix this? Is the imap(..) function working as it should and I'm just misunderstanding how it should be working?

0

There are 0 answers