This is my code.
SplitStream<MonitoringEvent> splitStream = inputStream.split(new OutputSelector<MonitoringEvent>() {
@Override
public Iterable<String> select(MonitoringEvent me) {
List<String> ml = new ArrayList<String>();
ml.add(me.getEventType());
return ml;
}
I have stream of Monitoring Events coming in random order temp : 80, pressure : 70 , humidity :80, temp:30...
With the above code, am splitting the stream , eventType wise i.e temperatureStream, pressureStream.
The problem is , if I know the eventType, i can select it from the splitStream like
splitStream.select('temperatureStream')
but the eventType is dynamic and not pre-defined.
How will I apply CEP for this dynamic stream. The CEP would be like, if the
temperate is > 90 for past 10 minutes ...
pressure is > 90 for past 10 minutes ...
Correct me if i'm wrong, but i think it isn't possible to do a dynamic lookup on select due flink's parallism. Your program gets translated into parallel instructions for flinks taskmanagers and the jobmanager coordinate these actions. Without overall knowlegde about your abstract syntax tree no parallism could be applied at all... Maybe you could find some in common attribute that all messages share and differ from there