Is there any form to write to BigQuery specifying the name of destination tables dynamically?

807 views Asked by At

Is there any form to write to BigQuery specifying the name of destination tables dynamically?

Now I have:

bigQueryRQ
.apply(BigQueryIO.Write
    .named("Write")
    .to("project_name:dataset_name.table_name")
    .withSchema(Table.create_auditedTableSchema())
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

But I need the "table_name" as a dynamic table name that depends on the "tablerow" data that I want to write.

2

There are 2 answers

1
Davor Bonaci On BEST ANSWER

Unfortunately, we don't provide an API to name the BigQuery table in a data-dependent way. Generally speaking, data-dependent BigQuery table destination(s) may be error prone.

That said, we are working on improving flexibility in this area. No estimates at this time, but we hope to get this soon.

1
KrasK On

I have the same problem. How about to group rows by tags, and apply BigQueryIO.Write for every group separately?

    public static class TagMarker extends DoFn<TableRow, TableRow> {

    private Map<String, TupleTag<TableRow>> tagMap;

    public TagMarker(Map<String, TupleTag<TableRow>> tagMap) {
        this.tagMap = tagMap;
    }

    @Override
    public void processElement(ProcessContext c) throws Exception {
        TableRow item = c.element();
        c.sideOutput(tagMap.get(getTagName(item)), item);
    }

    private String getTagName(TableRow row) {
        // There will be your logic of determinate table by row
        return "table" + ((String)row.get("msg")).substring(0, 1);
    }

}


private static class GbqWriter extends PTransform<PCollection<TableRow>, PDone> {

    @Override
    public PDone apply(PCollection<TableRow> input) {

        TupleTag<TableRow> mainTag = new TupleTag<TableRow>();
        TupleTag<TableRow> tag2 = new TupleTag<TableRow>();
        TupleTag<TableRow> tag3 = new TupleTag<TableRow>();

        Map<String, TupleTag<TableRow>> tagMap = new HashMap<String, TupleTag<TableRow>>();
        tagMap.put("table1", mainTag);
        tagMap.put("table2", tag2);
        tagMap.put("table3", tag3);

        List<TupleTag<?>> tags = new ArrayList<TupleTag<?>>();
        tags.add(tag2);
        tags.add(tag3);

        PCollectionTuple result = input.apply(
            ParDo.withOutputTags(mainTag, TupleTagList.of(tags)).of(new TagMarker(tagMap))
        );

        PDone done = null;
        for (String tableId : tagMap.keySet()) {
            done = writeToGbq(tableId, result.get(tagMap.get(tableId)).setCoder(TableRowJsonCoder.of()));
        }

        return done;
    }


    private PDone writeToGbq(String tableId, PCollection<TableRow> rows) {

        PDone done = rows
                .apply(BigQueryIO.Write.named("WriteToGbq")
                .to("<project>:<dataset>." + tableId)
                .withSchema(getSchema())
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
        );

        return done;
    }

}

I am not sure about rewriting variable done. Is it correct? Can it brake rewriting to GBQ after fail.

And this way is suitable only if you know list of tables which we want write to before parsing rows.