I want to use RxPy to open a (csv) file and process the file line by line. My precisely I envision to have the following steps
- provide a filename to the stream
- open the file
- read file line by line
- remove lines which start with a comment (e.g. # ...)
- apply csv reader
- filter records matching some criteria
So far I have:
def to_file(filename):
f = open(filename)
return Observable.using(
lambda: AnonymousDisposable(lambda: f.close()),
lambda d: Observable.just(f)
)
def to_reader(f):
return csv.reader(f)
def print_rows(reader):
for row in reader:
print(row)
This works
Observable.from_(["filename.csv", "filename2.csv"])
.flat_map(to_file).**map**(to_reader).subscribe(print_rows)
This doesn't: ValueError: I/O operation on closed file
Observable.from_(["filename.csv", "filename2.csv"])
.flat_map(to_file).**flat_map**(to_rows).subscribe(print)
The 2nd doesn't work because (see https://github.com/ReactiveX/RxPY/issues/69)
When the observables from the first flatmap is merged by the second flatmap, the inner subscriptions will be disposed when they complete. Thus the files will be closed, even if the file handles are on_next'ed into the new observable set up by the second flatmap.
Any idea how I can achieve: Something like:
Observable.from_(["filename.csv", "filename2.csv"]
).flat_map(to_file
).filter(comment_lines
).filter(empty_lines
).map(to_csv_reader
).filter(filter_by.. )
).do whatever
Thanks a lot for your help
Juergen
I just started working with RxPy recently and needed to do the same thing. Surprised someone hasn't already answered your question but decided to answer just in case someone else needs to know. Assuming you have a CSV file like this:
Here is a solution: