Summary
We are using Flink 1.15.1 and have long-running stateful Flink jobs ingesting data from Kafka topics. They are configured to write checkpoints, with a RocksDB backend on S3.
We have noticed that sometimes the S3 "folders" for some checkpoints were getting very large and increasing continuously. They are also larger than what Flink reports in the UI. When cancelling the associated Flink jobs, some files are deleted but not all of them.
This seems to happen especially when Flink struggles to complete the checkpoints in the first time.
Experiment
I tried to reproduce the problem by submitting a job with a very small checkpointing timeout and a continuous input data:
- Initially, the checkpoints were successful, new
chk-<checkpoint id>
folders were created, and regularly replaced the previous one. So, at one point in time, I had onechk-<id>
folder with a_metadata
file and two other files. Those two other files had sizes that kept increasing as the job consumed more and more data. - At some point, the checkpoints started to time out, as expected. As I kept the job running like this for a while, I noticed that some older
chk-<id>
folders were not removed, even though there were more recentchk-<id>
folders. Those folders were not empty. However, the only_metadata
file I found does not reference them, while it does reference other checkpoint files from the same folder. So they look like uncollected garbage. - I stopped the data source. The job started writing checkpoints successfully again, but the old uncollected files were still there.
- I removed some of the files that I thought were garbage, and the job kept running without complaining.
- I cencelled the job. The
chk-<id>
folder that seemed valid was deleted. But the remainingchk-<id>
folders that seemed problematic were indeed still present.
Any idea what could be the problem here? How to force Flink to remove files that are not part of its checkpoints anymore?