CEP - Pattern not executed after adding Window

282 views Asked by At

//Creating a window of ten items

WindowedStream<ObservationEvent,Tuple,GlobalWindow> windowStream = inputStream.keyBy("rackId").countWindow(10);

// Applying a Window Function , adding some custom evaluating all the values in the window

DataStream<ObservationEvent> inactivityStream = windowStream.apply(new WindowFunction<ObservationEvent, ObservationEvent , Tuple , GlobalWindow>() { 

                        @Override 
                        public void apply(Tuple tuple, GlobalWindow timeWindow, Iterable<ObservationEvent> itr, Collector<ObservationEvent> out) 
                                //custom evaluation logic 
                                out.collect(new ObservationEvent(1,"temperature", "stable")); 
                        } 
                });

//Defining Simple CEP Pattern

 Pattern<ObservationEvent, ?> inactivityPattern = Pattern.ObservationEvent>begin("first") 
                                .subtype(ObservationEvent.class) 
                                .where(new FilterFunction<ObservationEvent>() { 

                                        @Override 
                                        public boolean filter(ObservationEvent arg0) throws Exception { 
                                                System.out.println( arg0 );  //This function is not at all called
                                                return false; 
                                        } 
                });



PatternStream<ObservationEvent> inactivityCEP = CEP.pattern(inactivityStream.keyBy("rackId"), inactivityPattern); 

When I run this code, the filter function inside the where clause is not at all getting called. I have printed the inactivityStream.print() and I can see the matching value.

Now, when I plug in the inputStream directly without applying a window. The pattern is matching

I printed inputStream and WindowedStream and I can see they both send similar kind of data.

What am I missing

1

There are 1 answers

2
Jamie Grier On

The FilterFunction should be getting called eventually but you are going to have to wait for 10 events for the SAME key before you see your FilterFunction called for the first time. Could it be that you are just not waiting long enough in your windowing test?

Keep in mind that if you have many unique keys this implies you will have to wait well more than 10 times as long in the window test before you'll see your filter function called.