Watermark fell far behind in Flink CEP

326 views Asked by At

I am using Flink CEP to detect patterns against events from Kafka. For simplicity, events only have one type. I am trying to detect the change in the value of a field in the continuous event stream. The code looks like the following

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
          .filter(...)
          .map(...)
          .assignTimestampsAndWatermarks(
            WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
          )
          .keyBy(...)(TypeInformation.of(classOf[...]))
    
val pattern: Pattern[Event, _] = 
          Pattern.begin[Event]("start", AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
          .next("middle")
          .oneOrMore()
          .optional()
          .where(new IterativeCondition[Event] {
             override def filter(event: Event, ctx:...): Boolean = {
                 val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                 startTrafficEvent.getFieldValue().equals(event.getFieldValue())
             }
          })
          .next("end").times(1)
          .where(new IterativeCondition[Event] {
             override def filter(event: Event, ctx:...): Boolean = {
                  val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                  !startTrafficEvent.getFieldValue().equals(event.getFieldValue())
            }
          })
          .within(Time.seconds(30))

Kafka topic has 104 partitions, events are distributed evenly across the partitions. When I submitted the job, parallelism was set to 104.

From Web UI, there were 2 tasks: the first one is Source->filter->map->timestamp/watermark; the second one is CepOperator->sink. Each task got 104 parallelism.

The workload on subtasks was uneven, it should come from keyBy. Watermarks among subtasks were different, but they started to be stuck at a value, no change for a long time. From logs, I can see CEP kept evaluating events, and matched results being pushed to downstream sink.

The event rate was 10k/s, and the first task's backpressure kept high and the second one ok.

Please help explain what happened in CEP and how to fix the issue

Thanks

1

There are 1 answers

7
David Anderson On BEST ANSWER

Having given your question more careful consideration, I'm revising my answer.

It sounds like CEP is continuing to produce matches and they are being pushed to the sink, but the CEP+sink task is producing high backpressure. What would help is to identity the cause of the backpressure.

If events are available to read from all partitions, and yet the watermarks are only barely advancing, it sounds like the backpressure is severe enough to prevent events from being ingested at all.

I suspect

  1. a combinatorial explosion of effort in the CEP engine, and/or
  2. enough matches that the sink can't keep up

as likely causes.

A few ideas for getting more insight:

(1) Try using a profiler to determine if the CepOperator is the bottleneck, and perhaps identify what it is doing.

(2) Disable operator chaining between the CepOperator and the sink in order to isolate CEP -- simply as a debugging step. This will give you better visibility (via the metrics and backpressure monitoring) as to what CEP and the sink are each doing.

(3) Test this in a smaller setup, and expand the CEP logging.