How to convert PCollection<TableRow> to PCollection<KV<String, String>> in JAVA

1.5k views Asked by At

I'm trying to convert a tablerow containing multiple values to a KV. I can achieve this in a DoFn but that adds more complexity to the code that I want to write further and makes my job harder. (Basically I need to perform CoGroupBy operation on two pcollections of tablerow)

Is there any way I can convert a PCollection to PCollection<KV<String, String>>, where the keys and values are stored in the same format as present in the tablerow?

I wrote a snippet that looks something like this but this doesnt give me the result I want, is there any way I can load all the entries in tablerow and generate a KV with those values?

ImmutableList<TableRow> input = ImmutableList.of(new TableRow().set("val1", "testVal1").set("val2", "testVal2").set("val3", "testVal3");
PCollection<TableRow> inputPC = p.apply(Create.of(input));

inputPC.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
                .via(tableRow -> KV.of((String) tableRow.get("val1"), (String) tableRow.get("val2"))));
2

There are 2 answers

1
Eduardo Ortiz On

To convert from PCollection TableRow into PCollection string you can use the following code:

static class StringConverter extends DoFn<String, TableRow> {
@Override
public void processElement(ProcessContext c) { 
c.output(new TableRow().set("string_field", c.element())); }
}

Here you can read more on how to transform from a TableRow to a String.

2
Daniel Oliveira On

It looks like what you want is a way to perform a Join on data obtained from BigQuery. There is no way to perform Joins on TableRows directly, and this is because TableRows are not meant to be generally manipulated as elements in your pipeline, their purpose is specifically for reading and writing with BigQuery IO.

In order to be able to use existing Beam transforms, you'll want to convert those TableRows into a more useful representation, such as either a Java object you write yourself, or the Beam schema Row type. Since TableRow is essentially a dictionary of JSON strings, all you need to do is write a Map function that reads the appropriate types and parses them if necessary. For example:

PCollection<TableRow> tableRows = ... // Reading from BigQuery IO.
PCollection<Foo> foos = tableRows.apply(MapElements.via(
    new SimpleFunction<TableRow, Foo>() {
        @Override
        public Foo apply(TableRow row) {
            String bar = (String) row.get("bar");
            Integer baz = (Integer.parseInt((String) row.get("baz")));
            return new Foo(bar, baz);
        }
    });

Once you have the data in a type of your choice, you can use find a way to perform a Join with built-in Beam transforms. There are many potential ways to do this so I won't list all of them, but a clear first choice to look at is the Join class.