I'm trying to design a pipeline that would be reading data from PubSubIO
and aggregating it into one output fire every 60 seconds.
Input:
00:00:01 -> "1"
00:00:21 -> "2"
00:00:41 -> "3"
00:01:01 -> "4"
00:01:21 -> "5"
00:01:51 -> "6"
Expected output:
00:01:00 -> "1,2,3"
00:02:00 -> "1,2,3,4,5,6"
Here is my code:
pipeline
.apply("Reading PubSub",
PubsubIO
.readMessagesWithAttributes()
.fromSubscription("..."))
.apply("Get message",
ParDo.of(new DoFn<PubsubMessage, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
PubsubMessage ref = c.element();
c.output(new String(ref.getPayload()));
}
}))
.apply("Window",
Window.<String>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(60))))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
.apply("Accumulate result to iterable",
Combine.globally(new CombineIterableAccumulatorFn<>()))
.apply("toString()", ToString.elements())
.apply("Write to file",
TextIO
.write()
.withWindowedWrites()
.withNumShards(1)
.to("result"));
This is my CombineFn
implementation for aggregation data into Iterable
public class CombineIterableAccumulatorFn<T> extends Combine.CombineFn<T, List<T>, Iterable<T>> {
@Override
public List<T> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<T> addInput(List<T> accumulator, T input) {
accumulator.add(input);
return accumulator;
}
@Override
public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
return StreamSupport.stream(accumulators.spliterator(), false)
.flatMap(List::stream)
.collect(Collectors.toList());
}
@Override
public Iterable<T> extractOutput(List<T> accumulator) {
return accumulator;
}
}
With this realization I'm receiving next output:
00:01:00 -> "1,2,3"
00:02:00 -> "1,2,3
1,2,3,4,5,6"
To remove duplicated "1,2,3"
line at 00:02:00
I should add after line
.apply("Accumulate result to iterable",
Combine.globally(new CombineIterableAccumulatorFn<>()))
additional Windowing block like this:
.apply("Window",
Window
.<String>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(60))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
It all looks very complex. Is there any better options to implement this task?