Processing multiple patterns in Flink CEP in Parallel

1.1k views Asked by At

I have following case scenario

enter image description here

There are 2 Virtual machines which are sending streams to Kafka which are being received by CEP engine where warnings are generated when particular conditions are satisfied on the individual Stream.

Currently, CEP is checking for same conditions on both streams( when heart rate > 65 and respiration rate > 68) for both patients and raising alarms in Parallel as shown below

 // detecting pattern
        Pattern<joinEvent, ? > pattern = Pattern.<joinEvent>begin("start")
                .subtype(joinEvent.class).where(new FilterFunction<joinEvent>() {
                    @Override
                    public boolean filter(joinEvent joinEvent) throws Exception {
                        return joinEvent.getHeartRate() > 65 ;
                    }
                })
                .subtype(joinEvent.class)
                .where(new FilterFunction<joinEvent>() {
                    @Override
                    public boolean filter(joinEvent joinEvent) throws Exception {
                        return joinEvent.getRespirationRate() > 68;
                    }
                }).within(Time.milliseconds(100));

But I want to use different conditions for both Streams. For example, I would like to raise alarm if

For patient A : if heart rate > 65 and Respiration Rate > 68
For patient B : if heart rate > 75 and Respiration Rate > 78

How do I achieve this ? do I need to create multiple stream environments or multiple patterns in the same environment.

1

There are 1 answers

0
Biplob Biswas On BEST ANSWER

For your requirements, you can create 2 different patterns to have clear separation if you want.

If you want to perform this with the same pattern then it would be possible as well. To do this, read all your kafka topics in one kafka source:

    FlinkKafkaConsumer010<JoinEvent> kafkaSource = new FlinkKafkaConsumer010<>(
        Arrays.asList("topic1", "topic2"),
        new StringSerializerToEvent(),
        props);

Here I am assuming that the structure of your event from both the topics are the same and you have the patient name as well as part of the event which is trasnmitted.

Once you did that, it becomes easy as you just need to create a pattern with "Or", something like the following:

    Pattern.<JoinEvent>begin("first")
        .where(new SimpleCondition<JoinEvent>() {

          @Override
          public boolean filter(JoinEvent event) throws Exception {
            return event.getPatientName().equals("A") && event.getHeartRate() > 65 && joinEvent.getRespirationRate() > 68;
          }
        })
        .or(new SimpleCondition<JoinEvent>() {

          @Override
          public boolean filter(JoinEvent event) throws Exception {
            return event.getPatientName().equals("B") && event.getHeartRate() > 75 && joinEvent.getRespirationRate() > 78;
          }
        });

This would produce a match whenever your condition matches. Although, I am not really sure what ".within(Time.milliseconds(100))" is achieving in your example.