High Flink network buffer usage, which causes Kafka lagging

828 views Asked by At

Our Flink Jobs contains a filter, key by session id and then session window with 30mins gap. The session window will need to accumulate all the event for the session, and process them using ProcessWindowFunction.

We are using Flink 1.9, 128 containers with 20G memory in total to run our job and the cut-off ratio is 0.3. We are doing incremental checkpoints.

When session windows start to trigger process function, the network buffer usage start getting pretty high, and then we start getting Kafka input lagging. Our setting:

state.backend: rocksdb
state.checkpoints.dir: hdfs://nameservice0/service
state.backend.rocksdb.memory.managed: true
state.backend.incremental: true
#https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
state.backend.rocksdb.memory.write-buffer-ratio: 0.6
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1
state.backend.rocksdb.block.blocksize: 16mb
state.backend.rocksdb.writebuffer.count: 8
state.backend.rocksdb.writebuffer.size: 256mb
state.backend.rocksdb.timer-service.factory: heap

containerized.heap-cutoff-ratio: 0.25
taskmanager.network.memory.fraction: 0.85
taskmanager.network.memory.min: 512mb
taskmanager.network.memory.max: 7168mb
taskmanager.network.memory.buffers-per-channel: 8
taskmanager.memory.segment-size: 4mb
taskmanager.network.memory.floating-buffers-per-gate: 16
taskmanager.network.netty.transport: poll

Some of the graphs: enter image description here enter image description here enter image description here

Any suggestion will be appreciated!

2

There are 2 answers

0
monstereo On

I do not know the internals of the flink but reason could be related to the session window. What i mean, if you have so many session operations with the same interval(30mins), all session operations will be performed at the same time which can create a delay.

0
David Anderson On

If I had access to the details, here's what I would look at to try to improve performance for this application:

(1) Could the windows be re-implemented to do incremental aggregation? Currently the windows are building up what may be rather long lists of events, and they only work through those lists when the sessions end. This apparently takes long enough to cause backpressure on Kafka. If you can pre-aggregate the session results, this will even out the processing, and the problem should go away.

And no, I'm not contradicting what I said here. If I haven't been clear, let me know.

(2) You've put a lot of extra network buffering in place. This is usually counter-productive; you want the backpressure to ripple back quickly and throttle the source, rather than pushing more data into Flink's network buffers.

You would do better to reduce the network buffering, and if possible, use your available resources to provide more slots instead. Having more slots will reduce the impact when one slot is busy working through the contents of a long session that just ended. Giving more memory to RocksDB might help too.

(3) See if you can optimize serialization. There can be a 10x difference in throughput between the best and worst serializers. See Flink Serialization Tuning. If there are any fields in the records that you don't actually need, drop them.

(4) Look at tuning RocksDB. Make sure you are using the fastest available local disks for RocksDB, such as local SSDs. Avoid using network attached storage (such as EBS) for state.backend.rocksdb.localdir.