I'm using Bigquery,Tables to make a request to fetch the schema of a bigquery table from inside DoFn by passing a initialised tableRequest as a parameter as shown below
private static class FetchSchema extends DoFn<String,List<String>>{
Bigquery.Tables tableRequest;
ValueProvider<String> DestTableName;
ValueProvider<String> mapCols;
ValueProvider<String> recATableName;
public FetchSchema(Bigquery.Tables tableReq,ValueProvider<String> table,ValueProvider<String> mCols,ValueProvider<String> recATab){
this.tableRequest = tableReq;
this.DestTableName = table;
this.mapCols = mCols;
this.recATableName = recATab;
}
private List<String> getTableParams(String tableString) throws IOException{
String[] tableParams = new String[3];
List<String> tableParamsList = new ArrayList<String>();
tableParams[0] = tableString.substring(0,tableString.indexOf(":"));
tableParams[1] = tableString.substring(tableString.indexOf(":")+1,tableString.indexOf("."));
tableParams[2] = tableString.substring(tableString.indexOf("."));
Table table = tableRequest.get(tableParams[0],tableParams[1],tableParams[2]).execute();
List<TableFieldSchema> fields = table.getSchema().getFields();
for(int i = 0; i < fields.size(); i++){
tableParamsList.add(fields.get(i).getName());
tableParamsList.add(fields.get(i).getDescription());
}
return tableParamsList;
}
@ProcessElement
public void processElement(ProcessContext c) throws IOException{
String[] mCols = mapCols.get().split(",");
List<String> mapColsList = Arrays.asList(mCols);
c.output(getTableParams(DestTableName.get()));
c.output(getTableParams(recATableName.get()));
c.output(mapColsList);
}
}
But i get this error:
An exception occured while executing the Java class. null: InvocationTargetException: unable to serialize org.apache.beam.examples.flatFileTest$FetchSchema@6510b00e: com.google.api.services.bigquery.Bigquery$Tables
Any help please?
A BigQuery client created on your local machine is not useful to all of the workers used to execute your pipeline using Dataflow. Instead, you should create the
BigQuery.Tablesclient within the@StartBundlemethod of your DoFn. This method can take aStartBundleContextargument, which allows callinggetPipelineOptions().Note: Ideally this would be possible
@Setupmethod so the client could be reused across bundles, but it doesn't seem like the pipeline options are available there.