I'm trying to detect patterns over an original input stream, and drop the first event of each match from the original input stream to create a modified stream. Then, detect other patterns on the modified stream. The problem here is when I find the matches of patterns, how can I drop the resulting stream from the original input stream? e.g., see code below:
List<DataEvent> events = readDataSet(filePath,"data.csv");
DataStream<DataEvent> eventStream = env.fromCollection(events);
PatternStream<DataEvent> publicPatternStream = CEP.pattern(eventStream, randomPatterns.get(num)).inProcessingTime();
DataStream<DataEvent> results = publicPatternStream.process(new PatternProcessFunction<DataEvent, DataEvent>() {
@Override
public void processMatch(Map<String, List<DataEvent>> map, Context context, Collector<DataEvent> collector) throws Exception {
collector.collect(map.get("start").get(0));
}
});
Now, the question is "how can I remove the events of results data stream from eventStream?" One option is to write them in the file, but this way I need to run another job for the next patterns. I wonder if I can put the events of results data stream into a list and then update the initial event list. I appreciate any help.