How to simplify pipeline with aggregation and accumulation?

197 views Asked by At

I'm trying to design a pipeline that would be reading data from PubSubIO and aggregating it into one output fire every 60 seconds.

Input:

00:00:01 -> "1"
00:00:21 -> "2"
00:00:41 -> "3"
00:01:01 -> "4"
00:01:21 -> "5"
00:01:51 -> "6"

Expected output:

00:01:00 -> "1,2,3"
00:02:00 -> "1,2,3,4,5,6"

Here is my code:

pipeline
    .apply("Reading PubSub",
        PubsubIO
            .readMessagesWithAttributes()
            .fromSubscription("..."))
    .apply("Get message",
        ParDo.of(new DoFn<PubsubMessage, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                PubsubMessage ref = c.element();
                c.output(new String(ref.getPayload()));
            }
        }))
    .apply("Window",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterProcessingTime
                        .pastFirstElementInPane()
                        .plusDelayOf(Duration.standardSeconds(60))))
            .withAllowedLateness(Duration.ZERO)
            .accumulatingFiredPanes())
    .apply("Accumulate result to iterable",
        Combine.globally(new CombineIterableAccumulatorFn<>()))
    .apply("toString()", ToString.elements())
    .apply("Write to file",
        TextIO
            .write()
            .withWindowedWrites()
            .withNumShards(1)
            .to("result"));


This is my CombineFn implementation for aggregation data into Iterable

public class CombineIterableAccumulatorFn<T> extends Combine.CombineFn<T, List<T>, Iterable<T>> {

    @Override
    public List<T> createAccumulator() {
        return new ArrayList<>();
    }

    @Override
    public List<T> addInput(List<T> accumulator, T input) {
        accumulator.add(input);
        return accumulator;
    }

    @Override
    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
        return StreamSupport.stream(accumulators.spliterator(), false)
                .flatMap(List::stream)
                .collect(Collectors.toList());
    }

    @Override
    public Iterable<T> extractOutput(List<T> accumulator) {
        return accumulator;
    }
}

With this realization I'm receiving next output:

00:01:00 -> "1,2,3"
00:02:00 -> "1,2,3
             1,2,3,4,5,6"

To remove duplicated "1,2,3" line at 00:02:00 I should add after line

.apply("Accumulate result to iterable",
    Combine.globally(new CombineIterableAccumulatorFn<>()))

additional Windowing block like this:

.apply("Window", 
    Window
        .<String>into(new GlobalWindows())
        .triggering(
            Repeatedly.forever(
                AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(60))))
        .withAllowedLateness(Duration.ZERO)
        .discardingFiredPanes())

It all looks very complex. Is there any better options to implement this task?

0

There are 0 answers