Pytorch Datapipes are a new inplace dataset loaders for large data that can be fed into Pytorch models through streaming, for reference these are
- Official Doc: https://pytorch.org/data/main/tutorial.html
- A crash-course post explaining the usage https://sebastianraschka.com/blog/2022/datapipes.html
Given a myfile.csv file, initialised as csv_file variable in code, the file looks like this, :
imagefile,label
train/0/16585.png,0
train/0/56789.png,0
...
In the example code, that uses datapipes that reads a csv_file and then create a iterable dataset using torchdata.datapipes and we see something like:
from torchdata import datapipes as dp
def build_data_pipe(csv_file, transform, len=1000, batch_size=32):
new_dp = dp.iter.FileOpener([csv_file])
new_dp = new_dp.parse_csv(skip_lines=1)
# returns tuples like ('train/0/16585.png', '0')
new_dp = new_dp.shuffle(buffer_size=len)
...
# More code that returns `new_dp` variable that looks like some
# lazy-loaded unevaluated/materialized Iterable objects.
return new_dp
If we look at each step and the return to new_dp, we see:
>>> from torchdata import datapipes as dp
# The first initialize a FileOpenerIterDataPipe type
>>> new_dp = dp.iter.FileOpener(["myfile.csv"])
>>> new_dp
FileOpenerIterDataPipe
# Then after that the API to the DataPipes allows some overwriting/subclassing
# by calling a partial function, e.g.
>>> new_dp.parse_csv
functools.partial(<function IterDataPipe.register_datapipe_as_function.<locals>.class_function at 0x213123>, <class 'torchdata.datapipes.iter.util.plain_text_reader.CSVParserIterDataPipe'>, False, FileOpenerIterDataPipe)
>>> new_dp = new_dp.parse_csv(skip_lines=1)
>>> new_dp
CSVParserIterDataPipe
It looks like the new_dp.parse_csv(skip_lines=1) is trying do a a new initialization through a MixIn between CSVParserIterDataPipe and FileOpenerIterDataPipe but I'm not exactly sure what's happening.
To fully get a working datapipe, there's a whole bunch of other new_dp = new_dp.xxx() to call. And my question are,
Q1. Can't the DataPipe be initialize in a non-sequetial way? (P/S: This didn't work as expected)
from torchdata imnport datapipes as dp
class MyDataPipe(dp.iterGenericDataPipe):
def __init__(self, csv_file, skip_lines=1, shuffle_buffer=1000):
super().__init__([csv_file])
self.parse_csv(skip_lines=1)
self.new_dp.shuffle(buffer_size=shuffle_buffer)
But given that we have to overwrite the new_dp, seems like we might have to do something like:
from torchdata imnport datapipes as dp
class MyDataPipe(dp.iterGenericDataPipe):
def __init__(self, csv_file, skip_lines=1, shuffle_buffer=1000):
super().__init__([csv_file])
self = self.parse_csv(skip_lines=1)
self = self.new_dp.shuffle(buffer_size=shuffle_buffer)
It looks like you're trying to chain together a series of torch
DataPipes, namely:open_filesparse_csvshuffleThe official torchdata tutorial at https://pytorch.org/data/0.4/tutorial.html does so using a function (e.g.
def custom_data_pipe()), but you seem to prefer a class-based approach (e.g.class CustomDataPipe). Let's call this a DataPipeLine.An additional complication is that you're trying to apply an inheritance-style
torch.utils.data.Datasetto a composition-styletorchdata.datapipes.iter.IterDataPipe. Presumably, the reason you're doing this is to create a configurable 'dataset', e.g. one that can skip N lines, has a shuffle buffer of B, etc. Now there's a few things wrong about this, but let's go with it.Bad example (please don't use)
And the way you would use it is:
Now to be honest, this is really not recommended (and I'm half regretting writing up this answer already) because the reason
torchdataexists is to have compositional DataPipes, i.e. each DataPipe should be specialized to do one thing only rather than many things. Also, you won't be streaming data properly, as the iterator will need to run your data through all 3 functions (open_files,parse_csv,shuffle) per file, instead of doing things piecewise (in a parallelizable way), thus defeating the whole purpose of usingtorchdata!What you probably want is to 1) Read up more on composition and pipe-ing:
Then 2) write something like the below. I'm using a
LightningDataModulenot only because it's cool, but because it's closer to the thing you actually want to subclass:Better example
Usage:
Maybe not quite the answer you expected, but I'd encourage you to experiment a bit more. The key bit is to switch your mindset from inheritance (subclassing) to composition (chaining/pipe-ing).
P.S. Gonna throw in a shameless plug on some tutorials I wrote at https://zen3geo.readthedocs.io/en/v0.4.0/walkthrough.html. It's a bit geospatial specific, but might be helpful to get a feel of the DataPipe-way of working. Good luck!