RxPy read csv files and process lines

1k views Asked by At

I want to use RxPy to open a (csv) file and process the file line by line. My precisely I envision to have the following steps

  1. provide a filename to the stream
  2. open the file
  3. read file line by line
  4. remove lines which start with a comment (e.g. # ...)
  5. apply csv reader
  6. filter records matching some criteria

So far I have:

def to_file(filename):
f = open(filename)
return Observable.using(
    lambda: AnonymousDisposable(lambda: f.close()),
    lambda d: Observable.just(f)
)

def to_reader(f):
    return csv.reader(f)

def print_rows(reader):
    for row in reader:
        print(row)

This works

Observable.from_(["filename.csv", "filename2.csv"])
   .flat_map(to_file).**map**(to_reader).subscribe(print_rows)

This doesn't: ValueError: I/O operation on closed file

Observable.from_(["filename.csv", "filename2.csv"])
   .flat_map(to_file).**flat_map**(to_rows).subscribe(print)

The 2nd doesn't work because (see https://github.com/ReactiveX/RxPY/issues/69)

When the observables from the first flatmap is merged by the second flatmap, the inner subscriptions will be disposed when they complete. Thus the files will be closed, even if the file handles are on_next'ed into the new observable set up by the second flatmap.

Any idea how I can achieve: Something like:

Observable.from_(["filename.csv", "filename2.csv"]
   ).flat_map(to_file
   ).filter(comment_lines
   ).filter(empty_lines
   ).map(to_csv_reader
   ).filter(filter_by.. )
   ).do whatever

Thanks a lot for your help

Juergen

1

There are 1 answers

0
Tony Piazza On

I just started working with RxPy recently and needed to do the same thing. Surprised someone hasn't already answered your question but decided to answer just in case someone else needs to know. Assuming you have a CSV file like this:

$ cat datafile.csv
"iata","airport","city","state","country","lat","long"
"00M","Thigpen ","Bay Springs","MS","USA",31.95376472,-89.23450472
"00R","Livingston Municipal","Livingston","TX","USA",30.68586111,-95.01792778
"00V","Meadow Lake","Colorado Springs","CO","USA",38.94574889,-104.5698933
"01G","Perry-Warsaw","Perry","NY","USA",42.74134667,-78.05208056
"01J","Hilliard Airpark","Hilliard","FL","USA",30.6880125,-81.90594389

Here is a solution:

from rx import Observable
from csv import DictReader

Observable.from_(DictReader(open('datafile.csv', 'r'))) \
          .subscribe(lambda row: 
                     print("{0:3}\t{1:<35}".format(row['iata'], row['airport'][:35]))
          )