Google cloud data flow exmaple

85 views Asked by At

I was trying to insert multiple csv files from cloud storage to big query using the below link, but getting an error " attributeerror: 'filecoder' object has no attribute 'to_type_hint'" . Can someone please help me on this

https://github.com/GoogleCloudPlatform/professional-services/blob/main/examples/dataflow-python-examples/batch-examples/cookbook-examples/pipelines/data_ingestion_configurable.py

1

There are 1 answers

0
robertwb On

It looks like FileCoder is incorrectly not inheriting from beam.coders.Coder; I suspect fixing this will make the issue go away.

It would actually be preferable to use a DoFn rather than a Coder here anyway, e.g.

class CsvLineDecoder(beam.DoFn):
    """Encode and decode CSV data coming from the files."""

    def __init__(self, columns):
        self._columns = columns
        self._num_columns = len(columns)
        self._delimiter = ","

    def process(self, value):
        st = io.StringIO(value)
        cr = csv.DictWriter(st,
                            self._columns,
                            delimiter=self._delimiter,
                            quotechar='"',
                            quoting=csv.QUOTE_MINIMAL)
        return next(cr)

which would then be used as

(p
 | 'Read From Text - ' + input_file >> beam.io.ReadFromText(gs_path, skip_header_lines=1)
 | beam.ParDo(CsvLineDecoder(list(fields.keys())))
 ...)