Cleanup of iterators that have not been fully exhausted

299 views Asked by At

My main usage of generators is processing of rows of CSV files stored on a remote server. It allows me to have consistent interfaces of linearly processing the data stored in them.

Now, I am using paramiko in order to access an SFTP server that stores the files - and paramiko has an outstanding issue of not properly closing connections if you did not close the file itself.

I've got a simple interface of accessing a single file on the sftp (this is obviously a pseudocode - I am omitting the connection error handling code and so on).

 def sftp_read_file(filename):
       with paramiko.open(filename) as file_obj:
          for item in csv.reader(file_obj):
               yield item

 def csv_append_column(iter_obj, col_name, col_val):
     # header
     yield next(iter_obj) + (col_name, )
     for item in iter_obj:
         yield item + (col_val, )

Let's say I would like to test a set of transformations done to the file by running the script for a limited amount of rows:

def main():
    for i, item in enumerate(csv_append_column(sftp_read_file('sftp://...'), 'A', 'B')):
        print(item)
        if i > 0 and i % 100 == 0:
            break

The script will exit, but the interpreter will never terminate without SIGINT. What are my possible solutions?

1

There are 1 answers

0
2ps On

This isn’t the most elegant solution, but maybe we could build off @tadhg-mcdonald-jensen’s suggestion by wrapping the generator in an object:

class Stoppable(object):
    def __init__(self, fn):
        self.generator = fn

    def __enter__(self):
        return self.generator

    def __exit__(self, type_, value, traceback):
        self.generator.close()

And then use it like this:

def main():
    with Stoppable(sftp_read_file('sftp://...')) as reader:
        for i, item in enumerate(csv_append_column(reader, 'A', 'B')):
            print(item)
            if i > 0 and i % 100 == 0:
                break   

Alternatively, we can just wrap the generator itself if we aren't using the generator methodology for streaming:

def stopit(fn):
    rg = [ x for x in fn ]
    for x in rg:
        yield x

Now we can call it like:

def main():
    for i, item in enumerate(csv_append_column(stopit(sftp_read_file('...')), 'A', 'B')):
        print(item)
        if i > 0 and i % 100 == 0:
            break   

This will make sure the with block exits and paramiko closes the sftp connection but comes at the expense of reading all of the lines into memory at once.