I have following case scenario
There are 2 Virtual machines which are sending streams to Kafka which are being received by CEP engine where warnings are generated when particular conditions are satisfied on the individual Stream.
Currently, CEP is checking for same conditions on both streams( when heart rate > 65 and respiration rate > 68) for both patients and raising alarms in Parallel as shown below
// detecting pattern
Pattern<joinEvent, ? > pattern = Pattern.<joinEvent>begin("start")
.subtype(joinEvent.class).where(new FilterFunction<joinEvent>() {
@Override
public boolean filter(joinEvent joinEvent) throws Exception {
return joinEvent.getHeartRate() > 65 ;
}
})
.subtype(joinEvent.class)
.where(new FilterFunction<joinEvent>() {
@Override
public boolean filter(joinEvent joinEvent) throws Exception {
return joinEvent.getRespirationRate() > 68;
}
}).within(Time.milliseconds(100));
But I want to use different conditions for both Streams. For example, I would like to raise alarm if
For patient A : if heart rate > 65 and Respiration Rate > 68
For patient B : if heart rate > 75 and Respiration Rate > 78
How do I achieve this ? do I need to create multiple stream environments or multiple patterns in the same environment.
For your requirements, you can create 2 different patterns to have clear separation if you want.
If you want to perform this with the same pattern then it would be possible as well. To do this, read all your kafka topics in one kafka source:
Here I am assuming that the structure of your event from both the topics are the same and you have the patient name as well as part of the event which is trasnmitted.
Once you did that, it becomes easy as you just need to create a pattern with "Or", something like the following:
This would produce a match whenever your condition matches. Although, I am not really sure what ".within(Time.milliseconds(100))" is achieving in your example.