Flink co group outer join fails with High Backpressure

553 views Asked by At

I have two streams in Flink stream1 has 70000 records per sec and stream2 may or may not have data.

// Ingest the High Frequency Analog Stream
SingleOutputStreamOperator<FlatHighFrequencyAnalog> stream1 =
    environment
        .addSource(createHFAConsumer())
        .name("hfa source");

SingleOutputStreamOperator<EVWindow> stream2 = environment
        .addSource(createHFDConsumer())
        .name("hfd source");
    
DataStream<Message> pStream =
        stream1
        .coGroup(stream2)
        .where(obj -> obj.getid())
        .equalTo(ev -> ev.getid())
            .window(TumblingEventTimeWindows.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
            .evictor(TimeEvictor.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
        .apply(new CalculateCoGroupFunction());

This works perfectly fine when both Streams have data , but when stream2 has no data the job fails with very high back pressure. The CPU utilization also spikes by 200%.

How do I handle outer join in such scenario

3

There are 3 answers

0
Neha Jirafe On BEST ANSWER

Thanks David Anderson for the pointers

RCA :

The main issue came when I tried to create a Tumbling Window around my Stream.

As per Flink Documentation

In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness

Since there was no incoming data for stream2 the window never materialized. As David pointed out

Whenever multiple streams are connected, the resulting watermark is the minimum of the incoming watermarks

which means flink was buffering data from stream1 while waiting for stream2 and would eventually result in High Backpressure and finally a OOM.

The Solution :

I created a external script to send dummy heartbeat messages to the Kafka Stream stream2 at the desired interval and added logic in my application to ignore these messages for computation.

This forced the stream2 and stream1 to advance the watermarks and the window was removed out of context.

0
David Anderson On

I believe the problem is that the lack of watermarks from the idle stream is holding back the overall watermark. Whenever multiple streams are connected, the resulting watermark is the minimum of the incoming watermarks. This can then lead to problems like the one you are experiencing.

You have a couple of options:

  1. Set the watermark for stream2 to be Watermark.MAX_WATERMARK, thereby giving stream1 complete control of watermarking.
  2. Somehow detect that stream2 is idle, and artificially advance the watermark despite the lack of events. Here is an example.
0
Dmytro Shulikov On

As was discussed before:

Whenever multiple streams are connected, the resulting watermark is the minimum of the incoming watermarks

and

which means flink was buffering data from stream1 while waiting for stream2 and would eventually result in High Backpressure and finally a OOM.

It works for coGroup() method from the DataStream<T> class which returns CoGroupedStreams<T, T2>.

To avoid such behavior we can use union(DataStream<T>... streams) method which returns a simple DataStream<T> where the watermarks will be advancing as in a usual stream.

The only problem which we need to solve is to have a common schema (class) for both streams. We can use some aggregation class with two fields:

public class Aggregator {

  private FlatHighFrequencyAnalog flatHighFrequencyAnalog;
  private EVWindow evWindow;

  public Aggregator(FlatHighFrequencyAnalog flatHighFrequencyAnalog) {
    this.flatHighFrequencyAnalog = flatHighFrequencyAnalog;
  }

  public Aggregator(EVWindow evWindow) {
    this.evWindow = evWindow;
  }

  public FlatHighFrequencyAnalog getFlatHighFrequencyAnalog() {
    return flatHighFrequencyAnalog;
  }

  public EVWindow getEVWindow() {
    return evWindow;
  }
}

Also, a more generic way is to use Either<L, R> class from org.apache.flink.types.

Let's summarize what we'll have in the end:

SingleOutputStreamOperator<Either<EVWindow, FlatHighFrequencyAnalog>> stream1 =
    environment
        .addSource(createHFAConsumer())
        .map(hfa -> Either.Left(hfa));

SingleOutputStreamOperator<Either<EVWindow, FlatHighFrequencyAnalog>> stream2 = 
    environment
        .addSource(createHFDConsumer())
        .map(hfd -> Either.Right(hfd));
    
DataStream<Message> pStream =
        stream1
          .union(stream2)
          .assignTimestampsAndWatermarks(
              WatermarkStrategy
                  .<Either<EVWindow, FlatHighFrequencyAnalog>>forBoundedOutOfOrderness(
                    ofSeconds(MAX_OUT_OF_ORDERNESS))
                .withTimestampAssigner((input, timestamp) -> input.isLeft() ? input.left().getTimeStamp() : input.right().getTimeStamp()))
          .keyBy(value -> value.isLeft() ? value.left().getId() : value.right().getId())
          .window(TumblingEventTimeWindows.of(Time.minutes(MINUTES)))
          .process(new ProcessWindowFunction());

To get different collections in the process function

List<EVWindow> evWindows =
        Streams.stream(elements)
            .filter(Either::isLeft)
            .map(Either::left)
            .collect(Collectors.toList());

List<FlatHighFrequencyAnalog> highFrequencyAnalogs =
        Streams.stream(elements)
            .filter(Either::isRight)
            .map(Either::right)
            .collect(Collectors.toList());