Filtering a stream from another stream in one Flink execution

35 views Asked by At

I'm trying to detect patterns over an original input stream, and drop the first event of each match from the original input stream to create a modified stream. Then, detect other patterns on the modified stream. The problem here is when I find the matches of patterns, how can I drop the resulting stream from the original input stream? e.g., see code below:

List<DataEvent> events = readDataSet(filePath,"data.csv");
DataStream<DataEvent> eventStream = env.fromCollection(events);
PatternStream<DataEvent> publicPatternStream = CEP.pattern(eventStream, randomPatterns.get(num)).inProcessingTime();

DataStream<DataEvent> results = publicPatternStream.process(new PatternProcessFunction<DataEvent, DataEvent>() {
                @Override
                public void processMatch(Map<String, List<DataEvent>> map, Context context, Collector<DataEvent> collector) throws Exception {

                    collector.collect(map.get("start").get(0));
                }
            });


Now, the question is "how can I remove the events of results data stream from eventStream?" One option is to write them in the file, but this way I need to run another job for the next patterns. I wonder if I can put the events of results data stream into a list and then update the initial event list. I appreciate any help.

0

There are 0 answers