Stateful Complex event processing with apache flink

443 views Asked by At

I want to detect if two events happen in a defined timeframe based on two events that have the same identifier. For example a DoorEvent looks like this:

<doorevent>
  <door>
    <id>1</id>
    <status>open</status>
  </door>
  <timestamp>12345679</timestamp>
</doorevent> 

<doorevent>
  <door>
    <id>1</id>
    <status>close</status>
  </door>
  <timestamp>23456790</timestamp>
</doorevent>

My DoorEvent java class in the example below has the same structure.

I want to detect that door with id 1 closes within 5 minutes of opening. I try to use the Apache flink CEP library for this purpose. The incoming stream contains all open and close messages from lets say 20 doors.

Pattern<String, ?> pattern = Pattern.<String>begin("door_open").where(
    new SimpleCondition<String>() {
        private static final long serialVersionUID = 1L;
        public boolean filter(String doorevent) {
            DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
            if (event.getDoor().getStatus().equals("open")){
                // save state of door as open
                return true;
            }
            return false;                           
        }
    }
)
.followedByAny("door_close").where(
    new SimpleCondition<String>() {
            private static final long serialVersionUID = 1L;
            public boolean filter(String doorevent) throws JsonParseException, JsonMappingException, IOException {
                DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
                if (event.getDoor().getStatus().equals("close")){
                    // check if close is of previously opened door
                    return true;
                }
                return false;
            }
        }
)
.within(Time.minutes(5));

How do I save the state of door 1 as open in the door_open so that in the door_close step I know that door 1 is the one being closed and it is not some other door?

1

There are 1 answers

0
Biplob Biswas On BEST ANSWER

If you have Flink 1.3.0 and above its really straightforard what you want to do

Your pattern would look something like this:

Pattern.<DoorEvent>begin("first")
        .where(new SimpleCondition<DoorEvent>() {
          private static final long serialVersionUID = 1390448281048961616L;

          @Override
          public boolean filter(DoorEvent event) throws Exception {
            return event.getDoor().getStatus().equals("open");
          }
        })
        .followedBy("second")
        .where(new IterativeCondition<DoorEvent>() {
          private static final long serialVersionUID = -9216505110246259082L;

          @Override
          public boolean filter(DoorEvent secondEvent, Context<DoorEvent> ctx) throws Exception {

            if (!secondEvent.getDoor().getStatus().equals("close")) {
              return false;
            }

            for (DoorEvent firstEvent : ctx.getEventsForPattern("first")) {
              if (secondEvent.getDoor().getEventID().equals(firstEvent.getDoor().getEventId())) {
                return true;
              }
            }
            return false;
          }
        })
        .within(Time.minutes(5));

So basically you can use IterativeConditions and get the context for the first patterns which are matched and iterate over that list while comparing for the one you need and proceed as you want.

IterativeConditions are expensive and should be handled accordingly

For more information on conditions check here at Flink - Conditions