I'm using Bigquery,Tables to make a request to fetch the schema of a bigquery table from inside DoFn by passing a initialised tableRequest as a parameter as shown below
private static class FetchSchema extends DoFn<String,List<String>>{
Bigquery.Tables tableRequest;
ValueProvider<String> DestTableName;
ValueProvider<String> mapCols;
ValueProvider<String> recATableName;
public FetchSchema(Bigquery.Tables tableReq,ValueProvider<String> table,ValueProvider<String> mCols,ValueProvider<String> recATab){
this.tableRequest = tableReq;
this.DestTableName = table;
this.mapCols = mCols;
this.recATableName = recATab;
}
private List<String> getTableParams(String tableString) throws IOException{
String[] tableParams = new String[3];
List<String> tableParamsList = new ArrayList<String>();
tableParams[0] = tableString.substring(0,tableString.indexOf(":"));
tableParams[1] = tableString.substring(tableString.indexOf(":")+1,tableString.indexOf("."));
tableParams[2] = tableString.substring(tableString.indexOf("."));
Table table = tableRequest.get(tableParams[0],tableParams[1],tableParams[2]).execute();
List<TableFieldSchema> fields = table.getSchema().getFields();
for(int i = 0; i < fields.size(); i++){
tableParamsList.add(fields.get(i).getName());
tableParamsList.add(fields.get(i).getDescription());
}
return tableParamsList;
}
@ProcessElement
public void processElement(ProcessContext c) throws IOException{
String[] mCols = mapCols.get().split(",");
List<String> mapColsList = Arrays.asList(mCols);
c.output(getTableParams(DestTableName.get()));
c.output(getTableParams(recATableName.get()));
c.output(mapColsList);
}
}
But i get this error:
An exception occured while executing the Java class. null: InvocationTargetException: unable to serialize org.apache.beam.examples.flatFileTest$FetchSchema@6510b00e: com.google.api.services.bigquery.Bigquery$Tables
Any help please?
A BigQuery client created on your local machine is not useful to all of the workers used to execute your pipeline using Dataflow. Instead, you should create the
BigQuery.Tables
client within the@StartBundle
method of your DoFn. This method can take aStartBundleContext
argument, which allows callinggetPipelineOptions()
.Note: Ideally this would be possible
@Setup
method so the client could be reused across bundles, but it doesn't seem like the pipeline options are available there.