Early firing in Flink - how to emit early window results to a different DataStream with a trigger

1k views Asked by At

I'm working with code that uses a tumbling window of one day, and would like to send early results to a different DataStream on an hourly basis. I understand that triggers are a way to go here, but don't really see how it would work.

The current code is as follows:

myStream
     .keyBy(..)
     .window(TumblingEventTimeWindows.of(Time.days(1)))
     .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())

In my understanding, I should register a trigger, and then on its onEventTime method get a hold of a TriggerContext and I can send data to the labeled output from there. But how do I get the current state of MyAggregateFunction from there? Or would I need to my own computation here inside of onEventTime()?

Also, the documentation states that "By specifying a trigger using trigger() you are overwriting the default trigger of a WindowAssigner.". Would my one day window then still fire correctly, or do I need to trigger it somehow differently?

Another way of doing this is creating two different operators - one that windows by 1 hour, and another that windows by 1 day. Would triggers be a preferred approach to that?

3

There are 3 answers

0
kkrugler On BEST ANSWER

Normally when I get to more complex behavior like this, I use a KeyedProcessFunction. You can aggregate (and save in state) hourly and daily results, set timers as needed, and use a side output for the hourly results versus the regular output for the daily results.

0
Dominik Wosiński On

There are quite a few questions here. I will try to ask all of them. First of all, if You specify Your own trigger using trigger() this means You are going to effectively override the default trigger and thus the window may not work the way it would by default. So, if You for example if You create the 1 day event time tumbling window, but override a trigger so that it fires for every 20th element, it will never fire based on event time.

Now, after Your custom trigger fires, the output from MyAggregateFunction will be passed to MyProcessWindowFunction, so It will work the same as for the default trigger, you don't need to access the MyAggregateFunction from inside the trigger.

Finally, while it may be technically possible to implement trigger to trigger partial results every hour, my personal opinion is that You should probably go with the two separate windows. While this solution may create a slightly larger overhead and may result in a larger state, it should be much clearer, easier to implement, and finally much more error resistant.

7
David Anderson On

Rather than using a custom Trigger, it would be simpler to have two layers of windowing, where the hourly results are further aggregated into daily results. Something like this:

hourlyResults = myStream
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())

dailyResults = hourlyResults
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.days(1)))
    .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())

hourlyResults.addSink(...)
dailyResults.addSink(...)

Note that the result of a window is not a KeyedStream, so you will need to use keyBy again, unless you can arrange to leverage reinterpretAsKeyedStream (docs).