//Creating a window of ten items
WindowedStream<ObservationEvent,Tuple,GlobalWindow> windowStream = inputStream.keyBy("rackId").countWindow(10);
// Applying a Window Function , adding some custom evaluating all the values in the window
DataStream<ObservationEvent> inactivityStream = windowStream.apply(new WindowFunction<ObservationEvent, ObservationEvent , Tuple , GlobalWindow>() {
@Override
public void apply(Tuple tuple, GlobalWindow timeWindow, Iterable<ObservationEvent> itr, Collector<ObservationEvent> out)
//custom evaluation logic
out.collect(new ObservationEvent(1,"temperature", "stable"));
}
});
//Defining Simple CEP Pattern
Pattern<ObservationEvent, ?> inactivityPattern = Pattern.ObservationEvent>begin("first")
.subtype(ObservationEvent.class)
.where(new FilterFunction<ObservationEvent>() {
@Override
public boolean filter(ObservationEvent arg0) throws Exception {
System.out.println( arg0 ); //This function is not at all called
return false;
}
});
PatternStream<ObservationEvent> inactivityCEP = CEP.pattern(inactivityStream.keyBy("rackId"), inactivityPattern);
When I run this code, the filter function inside the where clause is not at all getting called. I have printed the inactivityStream.print() and I can see the matching value.
Now, when I plug in the inputStream directly without applying a window. The pattern is matching
I printed inputStream and WindowedStream and I can see they both send similar kind of data.
What am I missing
The FilterFunction should be getting called eventually but you are going to have to wait for 10 events for the SAME key before you see your FilterFunction called for the first time. Could it be that you are just not waiting long enough in your windowing test?
Keep in mind that if you have many unique keys this implies you will have to wait well more than 10 times as long in the window test before you'll see your filter function called.