I have two streams in Flink stream1
has 70000 records per sec and stream2
may or may not have data.
// Ingest the High Frequency Analog Stream
SingleOutputStreamOperator<FlatHighFrequencyAnalog> stream1 =
environment
.addSource(createHFAConsumer())
.name("hfa source");
SingleOutputStreamOperator<EVWindow> stream2 = environment
.addSource(createHFDConsumer())
.name("hfd source");
DataStream<Message> pStream =
stream1
.coGroup(stream2)
.where(obj -> obj.getid())
.equalTo(ev -> ev.getid())
.window(TumblingEventTimeWindows.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
.evictor(TimeEvictor.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
.apply(new CalculateCoGroupFunction());
This works perfectly fine when both Streams have data , but when stream2 has no data the job fails with very high back pressure. The CPU utilization also spikes by 200%.
How do I handle outer join in such scenario
Thanks David Anderson for the pointers
RCA :
The main issue came when I tried to create a Tumbling Window around my Stream.
As per Flink Documentation
Since there was no incoming data for
stream2
the window never materialized. As David pointed outwhich means flink was buffering data from
stream1
while waiting forstream2
and would eventually result in High Backpressure and finally a OOM.The Solution :
I created a external script to send dummy heartbeat messages to the Kafka Stream
stream2
at the desired interval and added logic in my application to ignore these messages for computation.This forced the
stream2
andstream1
to advance the watermarks and the window was removed out of context.