I have built a pipeline that read some data, does some manipulations and create some apache beam Row objects (Steps 1 and 2 in the code below). I then would like to generate statistic and write them to file. I can leverage tensorflow data validation library for that, but tfdv GenerateStatistics expects a pyarrow.lib.RecordBatch
and not a Row
object. I am aware of apache_beam.io.tfrecordio.WriteToTFRecord
can write a PCollection to file as TFRecord, however, is there a way to do it without writing to file? Ideally a step 3 that would transform a Row
object to TFRecord
.
with beam.Pipeline(options=pipeline_options) as pipeline:
result = ( pipeline
| 'Step 1: Read data' >> ...
| 'Step 2: Do some transformations' >> ... # Here the resulting objects are beam Rows
| 'Step 3: transform Rows to TFRecord' >> ...
| 'Step 4: GenerateStatistics' >> tfdv.GenerateStatistics()
| 'Step 5: WriteStatsOutput' >> WriteStatisticsToTFRecord(STATS_OUT_FILE)
)
Without a step 3, my pipeline would generate the following error:
apache_beam.typehints.decorators.TypeCheckError: Input type hint violation at GenerateStatistics: expected <class 'pyarrow.lib.RecordBatch'>, got Row(attribute_1=Any, attribute_2=Any, ..., attribute_n=Any)
Full type hint:
IOTypeHints[inputs=((<class 'pyarrow.lib.RecordBatch'>,), {}), outputs=((<class 'tensorflow_metadata.proto.v0.statistics_pb2.DatasetFeatureStatisticsList'>,), {})]
strip_pcoll_output()
UPDATE:
I have worked on trying to implement the Step 3
in the pipeline above. GenerateStatistics
takes as inputs pyarrow RecordBatch objects, and therefore I tried to build it from the dictionary I get from previous steps. I could came out with the following:
class DictToPyarrow(beam.DoFn):
def __init__(self, schema: pyarrow.schema, *unused_args, **unused_kwargs):
super().__init__(*unused_args, **unused_kwargs)
self.schema = schema
def process(self, pydict):
dict_list = dict()
for key, val in pydict.items():
dict_list[key] = [val]
table = pyarrow.Table.from_pydict(dict_list, schema=self.schema)
batches = table.to_batches()
yield batches[0]
with beam.Pipeline(options=pipeline_options) as pipeline:
result = ( pipeline
| 'Step 1: Read data' >> ...
| 'Step 2: Do some transformations' >> ... # Here the resulting objects are beam Rows
| "STEP 3: To pyarrow" >> beam.ParDo(DictToPyarrow(schema))
| 'Step 4: GenerateStatistics' >> tfdv.GenerateStatistics()
| 'Step 5: WriteStatsOutput' >> WriteStatisticsToTFRecord(STATS_OUT_FILE)
)
But i get the following error:
TypeError: Expected feature column to be a (Large)List<primitive|struct> or null, but feature my_feature_1 was int32. [while running 'GenerateStatistics/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/ToTopKTuples']
UPDATE 2: I was able to create a tfx ExampleGen objects from the beam pipeline, thanks to this. Something like
with beam.Pipeline(options=pipeline_options) as pipeline:
result = ( pipeline
| 'Step 1: Read data' >> beam.io.ReadFromParquet(gsc_input_file)
| 'Step2: ToTFExample' >> beam.Map(utils.dict_to_example)
| "Step3: GenerateStatistics" >> GenerateStatistics(tfdv_options)
)
But i get the following error
apache_beam.typehints.decorators.TypeCheckError: Input type hint violation at GenerateStatistics: expected <class 'pyarrow.lib.RecordBatch'>, got <class 'tensorflow.core.example.example_pb2.Example'>
Full type hint:
IOTypeHints[inputs=((<class 'pyarrow.lib.RecordBatch'>,), {}), outputs=((<class 'tensorflow_metadata.proto.v0.statistics_pb2.DatasetFeatureStatisticsList'>,), {})]
strip_pcoll_output()
It seems that from tensorflow_data_validation import GenerateStatistics
wants RecordBatch
.
Unfortunately there’s no other way to do it without writing in a file. We need to do this process because Machine Learning frameworks consume training data as sequence of examples, this file formats for training ML should have easily consumable layouts with no impedance mismatch with the storage platform or programming language used to read/write files.
Some advantage to use
TFRecords
are:You can see more documentation about TFRecord.