I am developing a Dataflow pipeline which is reading a protobuf file from google cloud storage and parsing it and trying to write to BigQuery table. It is working fine when no. of rows is around 20k but when no. of rows is around 200k then it fails. Below is sample code:
Pipeline pipeline = Pipeline.create(options);
PCollection<PBClass> dataCol = pipeline.apply(FileIO.match().filepattern(options.getInputFile()))
.apply(FileIO.readMatches())
.apply("Read GPB File", ParDo.of(new ParseGpbFn()));
dataCol.apply("Transform to Delta", ParDo.of(deltaSchema))
.apply(Flatten.iterables())
.apply(
BigQueryIO
//.write()
.writeTableRows()
.to(deltaSchema.tableSpec)
.withMethod(Method.STORAGE_WRITE_API)
.withSchema(schema)
//.withFormatFunction(irParDeltaSchema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.withExtendedErrorInfo()
)
;
have tried different combination of following methods
withMethod
write
withFormatFunction
also different no. of workers and differnt compute engine type.
Everytime it is stuck at GroupByKey stage and give following error:
Error message from worker: java.lang.RuntimeException: Failed to create job with prefix beam_bq_job_LOAD_testjobpackage_<...>, reached max retries: 3, last failed job: null.
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJob.runJob(BigQueryHelpers.java:199)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$PendingJobManager.waitForDone(BigQueryHelpers.java:152)
org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.finishBundle(WriteTables.java:322)


The error code that you are receiving -described above- is because somewhere within your code, when you are specifying the GCS file that you want to load, it is malformed, the URI is expected to look something like this gs://bucket/path/to/file.