Apache Beam: WriteToFiles Based on Filename

242 views Asked by At

This question extends the discussion from a previous post:

Writing to Multiple Files from Single PCollection (Beam-Python)

In short, I am looking to transform some files but would like for their input and output to match.. so for example:

gcs_files: ['input/gcs_file_1.json', 'input/gcs_file_2.json', 'input/gcs_file_3.json']

Output: ['output/gcs_file_1_transformed.json', 'output/gcs_file_2_transformed.json', 'output/gcs_file_2_transformed.json']

I am trying ReadAllFromText(with_filename=True) and this will yield a tuple in the following format:

record = (<gcs_file_dir>, <json_record>)

I only need to transform <json_record> but I need <gcs_file_dir> to make sure I can preserve the same file name as its input.

I know WriteToFiles() can use the destination argument to assign based on record[0]..

I have the following figured out:

(p | beam.Create(gcs_files) | ReadAllFromText(with_filename=True) | beam.Map(transform_func) | WriteToFiles(path = <path>, destination = lambda record:record[0))

However, I only want to write record[1], since this is the actual transformed value. Is there a way to specify only part of the PCollection for write?

Thank you!

1

There are 1 answers

0
robertwb On

It looks like this is not supported out of the box.

You could write a custom Sink (possibly that delegates to a different FileSink) that strips off the portion you're not interested in writing to the file.