How to use CombineFn to merge windowed PCollection of KV<String, String> to List<KV<String, String>>

118 views Asked by At

I have a requirement to convert a windowed PCollection KV<String, String> to List<KV<String, String>> run on Google dataflow.

Basically to combine all the KV<String,String> within that 120 second window to a List<KV<String, String>>

I tried with below code and for some unknown reason the CombineFn does not seem to go anywhere and is hung.

Please let me know if the approach is correct or what am i doing wrong?

public static class CombineToListFn
            extends Combine.CombineFn<KV<String, String>, List<KV<String, String>>, List<KV<String, String>>> {
        @Override
        public List<KV<String, String>> createAccumulator() {
            System.out.println("createAccumulator called");
            return new ArrayList<>();
        }

        @Override
        public List<KV<String, String>> addInput(List<KV<String, String>> accumulator, KV<String, String> input) {
            System.out.println("input: " + input);
            accumulator.add(input);
            return accumulator;
        }

        @Override
        public List<KV<String, String>> mergeAccumulators(Iterable<List<KV<String, String>>> accumulators) {
            System.out.println("mergeAccumulators called");
            List<KV<String, String>> merged = new ArrayList<>();
            for (List<KV<String, String>> accumulator : accumulators) {
                merged.addAll(accumulator);
            }
            return merged;
        }

        @Override
        public List<KV<String, String>> extractOutput(List<KV<String, String>> accumulator) {
            System.out.println("extractOutput called");
            return accumulator;
        }
    }
PCollection<KV<String, String>> groupedData = productsAsString.apply("GroupProductData",
                Window.into(FixedWindows.of(Duration.standardSeconds(120))));

// Combine the groupedData into a List of KV<String, String>
PCollection<List<KV<String, String>>> combinedData = groupedData.
                apply(Combine.globally(new CombineToListFn()).withoutDefaults());
0

There are 0 answers