apache beam rows to tfrecord in order to GenerateStatistics

495 views Asked by At

I have built a pipeline that read some data, does some manipulations and create some apache beam Row objects (Steps 1 and 2 in the code below). I then would like to generate statistic and write them to file. I can leverage tensorflow data validation library for that, but tfdv GenerateStatistics expects a pyarrow.lib.RecordBatch and not a Row object. I am aware of apache_beam.io.tfrecordio.WriteToTFRecord can write a PCollection to file as TFRecord, however, is there a way to do it without writing to file? Ideally a step 3 that would transform a Row object to TFRecord.

with beam.Pipeline(options=pipeline_options) as pipeline:
    result = ( pipeline  
             | 'Step 1: Read data' >> ...
             | 'Step 2: Do some transformations' >> ...  # Here the resulting objects are beam Rows
             | 'Step 3: transform Rows to TFRecord' >> ...
             | 'Step 4: GenerateStatistics' >> tfdv.GenerateStatistics()
             | 'Step 5: WriteStatsOutput' >> WriteStatisticsToTFRecord(STATS_OUT_FILE)
            )

Without a step 3, my pipeline would generate the following error:

apache_beam.typehints.decorators.TypeCheckError: Input type hint violation at GenerateStatistics: expected <class 'pyarrow.lib.RecordBatch'>, got Row(attribute_1=Any, attribute_2=Any, ..., attribute_n=Any)
Full type hint:
IOTypeHints[inputs=((<class 'pyarrow.lib.RecordBatch'>,), {}), outputs=((<class 'tensorflow_metadata.proto.v0.statistics_pb2.DatasetFeatureStatisticsList'>,), {})]
strip_pcoll_output()

UPDATE: I have worked on trying to implement the Step 3 in the pipeline above. GenerateStatistics takes as inputs pyarrow RecordBatch objects, and therefore I tried to build it from the dictionary I get from previous steps. I could came out with the following:

class DictToPyarrow(beam.DoFn):
    def __init__(self, schema: pyarrow.schema, *unused_args, **unused_kwargs):
        super().__init__(*unused_args, **unused_kwargs)
        self.schema = schema

    def process(self, pydict):
        dict_list = dict()
        for key, val in pydict.items():
            dict_list[key] = [val]
        table = pyarrow.Table.from_pydict(dict_list, schema=self.schema)
        batches = table.to_batches()
        yield batches[0]

with beam.Pipeline(options=pipeline_options) as pipeline:
    result = ( pipeline  
             | 'Step 1: Read data' >> ...
             | 'Step 2: Do some transformations' >> ...  # Here the resulting objects are beam Rows
             | "STEP 3: To pyarrow" >> beam.ParDo(DictToPyarrow(schema))
             | 'Step 4: GenerateStatistics' >> tfdv.GenerateStatistics()
             | 'Step 5: WriteStatsOutput' >> WriteStatisticsToTFRecord(STATS_OUT_FILE)
            )
 

But i get the following error:

TypeError: Expected feature column to be a (Large)List<primitive|struct> or null, but feature my_feature_1 was int32. [while running 'GenerateStatistics/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/ToTopKTuples']

UPDATE 2: I was able to create a tfx ExampleGen objects from the beam pipeline, thanks to this. Something like

with beam.Pipeline(options=pipeline_options) as pipeline:
    result = ( pipeline  
             | 'Step 1: Read data' >> beam.io.ReadFromParquet(gsc_input_file)
             | 'Step2: ToTFExample' >> beam.Map(utils.dict_to_example)
             | "Step3: GenerateStatistics" >> GenerateStatistics(tfdv_options)
)

But i get the following error

apache_beam.typehints.decorators.TypeCheckError: Input type hint violation at GenerateStatistics: expected <class 'pyarrow.lib.RecordBatch'>, got <class 'tensorflow.core.example.example_pb2.Example'>
Full type hint:
IOTypeHints[inputs=((<class 'pyarrow.lib.RecordBatch'>,), {}), outputs=((<class 'tensorflow_metadata.proto.v0.statistics_pb2.DatasetFeatureStatisticsList'>,), {})]
strip_pcoll_output()

It seems that from tensorflow_data_validation import GenerateStatistics wants RecordBatch.

1

There are 1 answers

0
Raul Saucedo On

Unfortunately there’s no other way to do it without writing in a file. We need to do this process because Machine Learning frameworks consume training data as sequence of examples, this file formats for training ML should have easily consumable layouts with no impedance mismatch with the storage platform or programming language used to read/write files.

Some advantage to use TFRecords are:

  • Best performance training your model.
  • Binay data uses less space on disk and takes less time to copy and can be read much more efficiently from disk.
  • It makes it easier to combine multiple datasets and integrates seamlessly with the data import and preprocessing functionality provided by the library, especially for the datasets that are too large.

You can see more documentation about TFRecord.