I want to detect if two events happen in a defined timeframe based on two events that have the same identifier. For example a DoorEvent
looks like this:
<doorevent>
<door>
<id>1</id>
<status>open</status>
</door>
<timestamp>12345679</timestamp>
</doorevent>
<doorevent>
<door>
<id>1</id>
<status>close</status>
</door>
<timestamp>23456790</timestamp>
</doorevent>
My DoorEvent
java class in the example below has the same structure.
I want to detect that door with id 1 closes within 5 minutes of opening. I try to use the Apache flink CEP library for this purpose. The incoming stream contains all open and close messages from lets say 20 doors.
Pattern<String, ?> pattern = Pattern.<String>begin("door_open").where(
new SimpleCondition<String>() {
private static final long serialVersionUID = 1L;
public boolean filter(String doorevent) {
DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
if (event.getDoor().getStatus().equals("open")){
// save state of door as open
return true;
}
return false;
}
}
)
.followedByAny("door_close").where(
new SimpleCondition<String>() {
private static final long serialVersionUID = 1L;
public boolean filter(String doorevent) throws JsonParseException, JsonMappingException, IOException {
DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
if (event.getDoor().getStatus().equals("close")){
// check if close is of previously opened door
return true;
}
return false;
}
}
)
.within(Time.minutes(5));
How do I save the state of door 1 as open in the door_open
so that in the door_close
step I know that door 1 is the one being closed and it is not some other door?
If you have Flink 1.3.0 and above its really straightforard what you want to do
Your pattern would look something like this:
So basically you can use IterativeConditions and get the context for the first patterns which are matched and iterate over that list while comparing for the one you need and proceed as you want.
For more information on conditions check here at Flink - Conditions