Checkpointing issues in Flink 1.10.1 using RocksDB state backend

1k views Asked by At

We are experiencing a very difficult-to-observe problem with our Flink job.

The Job is reasonably simple, it:

  1. Reads messages from Kinesis using the Flink Kinesis connector
  2. Keys the messages and distributes them to ~30 different CEP operators, plus a couple of custom WindowFunctions
  3. The messages emitted from the CEP/Windows are forward to a SinkFunction that writes messages to SQS

We are running Flink 1.10.1 Fargate, using 2 containers with 4vCPUs/8GB, we are using the RocksDB state backend with the following configuration:

state.backend: rocksdb
state.backend.async: true
state.backend.incremental: false
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.ttl.compaction.filter.enabled: true
state.backend.rocksdb.files.open: 130048

The job runs with a parallelism of 8.

When the job starts from cold, it uses very little CPU and checkpoints complete in 2 sec. Over time, the checkpoint sizes increase but the times are still very reasonable couple of seconds:

checkpoint size and checkpoint time

During this time we can observe the CPU usage of our TaskManagers gently growing for some reason:

Taskmanager CPU over time

Eventually, the checkpoint time will start spiking to a few minutes, and then will just start repeatedly timing out (10 minutes). At this time:

  • Checkpoint size (when it does complete) is around 60MB
  • CPU usage is high, but not 100% (usually around 60-80%)
  • Looking at in-progress checkpoints, usually 95%+ of operators complete the checkpoint with 30 seconds, but a handful will just stick and never complete. The SQS sink will always be included on this, but the SinkFunction is not rich and has no state.
  • Using the backpressure monitor on these operators reports a HIGH backpressure

Eventually this situation resolves one of 2 ways:

  1. Enough checkpoints fail to trigger the job to fail due to a failed checkpoint proportion threshold
  2. The checkpoints eventually start succeeding, but never go back down to the 5-10s they take initially (when the state size is more like 30MB vs. 60MB)

We are really at a loss at how to debug this. Our state seems very small compared to the kind of state you see in some questions on here. Our volumes are also pretty low, we are very often under 100 records/sec.

We'd really appreciate any input on areas we could look into to debug this.

Thanks,

2

There are 2 answers

0
David Anderson On

A few points:

It's not unusual for state to gradually grow over time. Perhaps your key space is growing, and you are keeping some state for each key. If you are relying on state TTL to expire stale state, perhaps it is not configured in a way that allows it clean up expired state as quickly as you would expect. It's also relatively easy to inadvertently create CEP patterns that need to keep some state for a very long time before certain possible matches can be ruled out.

A good next step would be to identify the cause of the backpressure. The most common cause is that a job doesn't have adequate resources. Most jobs gradually come to need more resources over time, as the number of users (for example) being managed rises. For example, you might need to increase the parallelism, or give the instances more memory, or increase the capacity of the sink(s) (or the speed of the network to the sink(s)), or give RocksDB faster disks.

Besides being inadequately provisioned, other causes of backpressure include

  • blocking i/o is being done in a user function
  • a large number of timers are firing simultaneously
  • event time skew between different sources is causing large amounts of state to be buffered
  • data skew (a hot key) is overwhelming one subtask or slot
  • lengthy GC pauses
  • contention for critical resources (e.g., using a NAS as the local disk for RocksDB)

Enabling RocksDB native metrics might provide some insight.

0
monstereo On

Add this property to your configuration:

state.backend.rocksdb.checkpoint.transfer.thread.num: {threadNumberAccordingYourProjectSize}

if you do not add this , it will be 1 (default)

Link: https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java#L62