Our Flink Jobs contains a filter, key by session id and then session window with 30mins gap. The session window will need to accumulate all the event for the session, and process them using ProcessWindowFunction
.
We are using Flink 1.9, 128 containers with 20G memory in total to run our job and the cut-off ratio is 0.3. We are doing incremental checkpoints.
When session windows start to trigger process
function, the network buffer usage start getting pretty high, and then we start getting Kafka input lagging.
Our setting:
state.backend: rocksdb
state.checkpoints.dir: hdfs://nameservice0/service
state.backend.rocksdb.memory.managed: true
state.backend.incremental: true
#https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
state.backend.rocksdb.memory.write-buffer-ratio: 0.6
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1
state.backend.rocksdb.block.blocksize: 16mb
state.backend.rocksdb.writebuffer.count: 8
state.backend.rocksdb.writebuffer.size: 256mb
state.backend.rocksdb.timer-service.factory: heap
containerized.heap-cutoff-ratio: 0.25
taskmanager.network.memory.fraction: 0.85
taskmanager.network.memory.min: 512mb
taskmanager.network.memory.max: 7168mb
taskmanager.network.memory.buffers-per-channel: 8
taskmanager.memory.segment-size: 4mb
taskmanager.network.memory.floating-buffers-per-gate: 16
taskmanager.network.netty.transport: poll
Any suggestion will be appreciated!
I do not know the internals of the flink but reason could be related to the session window. What i mean, if you have so many session operations with the same interval(30mins), all session operations will be performed at the same time which can create a delay.