I have a requirement to convert a windowed PCollection KV<String, String> to List<KV<String, String>> run on Google dataflow.
Basically to combine all the KV<String,String> within that 120 second window to a List<KV<String, String>>
I tried with below code and for some unknown reason the CombineFn does not seem to go anywhere and is hung.
Please let me know if the approach is correct or what am i doing wrong?
public static class CombineToListFn
extends Combine.CombineFn<KV<String, String>, List<KV<String, String>>, List<KV<String, String>>> {
@Override
public List<KV<String, String>> createAccumulator() {
System.out.println("createAccumulator called");
return new ArrayList<>();
}
@Override
public List<KV<String, String>> addInput(List<KV<String, String>> accumulator, KV<String, String> input) {
System.out.println("input: " + input);
accumulator.add(input);
return accumulator;
}
@Override
public List<KV<String, String>> mergeAccumulators(Iterable<List<KV<String, String>>> accumulators) {
System.out.println("mergeAccumulators called");
List<KV<String, String>> merged = new ArrayList<>();
for (List<KV<String, String>> accumulator : accumulators) {
merged.addAll(accumulator);
}
return merged;
}
@Override
public List<KV<String, String>> extractOutput(List<KV<String, String>> accumulator) {
System.out.println("extractOutput called");
return accumulator;
}
}
PCollection<KV<String, String>> groupedData = productsAsString.apply("GroupProductData",
Window.into(FixedWindows.of(Duration.standardSeconds(120))));
// Combine the groupedData into a List of KV<String, String>
PCollection<List<KV<String, String>>> combinedData = groupedData.
apply(Combine.globally(new CombineToListFn()).withoutDefaults());