Write to BigQuery table is failing in Dataflow pipeline

957 views Asked by At

I am developing a Dataflow pipeline which is reading a protobuf file from google cloud storage and parsing it and trying to write to BigQuery table. It is working fine when no. of rows is around 20k but when no. of rows is around 200k then it fails. Below is sample code:

Pipeline pipeline = Pipeline.create(options);

        PCollection<PBClass> dataCol = pipeline.apply(FileIO.match().filepattern(options.getInputFile()))
                .apply(FileIO.readMatches())
                .apply("Read GPB File", ParDo.of(new ParseGpbFn()));

dataCol.apply("Transform to Delta", ParDo.of(deltaSchema))
                .apply(Flatten.iterables())
                .apply(
                        BigQueryIO
                                //.write()
                                .writeTableRows()
                                .to(deltaSchema.tableSpec)
                                .withMethod(Method.STORAGE_WRITE_API)
                                .withSchema(schema)
                                //.withFormatFunction(irParDeltaSchema)
                                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                                .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
                                .withExtendedErrorInfo()
                )
        ;

have tried different combination of following methods

withMethod
write
withFormatFunction

also different no. of workers and differnt compute engine type.

Everytime it is stuck at GroupByKey stage and give following error:

Error message from worker: java.lang.RuntimeException: Failed to create job with prefix beam_bq_job_LOAD_testjobpackage_<...>, reached max retries: 3, last failed job: null.
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJob.runJob(BigQueryHelpers.java:199)
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJobManager.waitForDone(BigQueryHelpers.java:152)
    org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.finishBundle(WriteTables.java:322)

Job steps table view: enter image description here and graph view: enter image description here

1

There are 1 answers

2
Eduardo Ortiz On
Error message from worker: java.lang.RuntimeException: Failed to create job with prefix beam_bq_job_LOAD_testjobpackage_<...>, reached max retries: 3, last failed job: null.
            org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJob.runJob(BigQueryHelpers.java:199)
            org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJobManager.waitForDone(BigQueryHelpers.java:152)
            org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.finishBundle(WriteTables.java:322)

The error code that you are receiving -described above- is because somewhere within your code, when you are specifying the GCS file that you want to load, it is malformed, the URI is expected to look something like this gs://bucket/path/to/file.