flink sql job throw no space left exception though sufficient space was available

33 views Asked by At

Flink version: 1.17.1 Environment: EKS Flink Kubernetes Operator

Flink SQL job with rocksdb backend having less than 10gb state in checkpoint. We change the instance type from m5d.2xlarge to r5d.xlarge and only change in flink yaml was taskmanager CPU changed from 4 > 3.

Somehow application unable to start from savepoint and complaining about the space. Tried creating small files in opt folder and it worked. In the end started job with last stable checkpoint instead savepoint and job started.

Question: What could be other possible issues that cause "No Space left on device" error as in this case there was sufficient space

exception:

2024-01-08 17:16:50,402 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception.
org.rocksdb.RocksDBException: While open a file for appending: /opt/flink/rocksdb/job_7b2937ad2b8a0189e1b27c0103408fc1_op_StreamingJoinOperator_9a0d21fd6364c562c773029964ae8006__2_8__uuid_5f4af36e-57bf-4a4e-b8c2-cc5f628ad226/db/000074.log: No space left on device
at org.rocksdb.RocksDB.write0(Native Method) ~[flink-dist-1.17.1.jar:1.17.1]
at org.rocksdb.RocksDB.write(RocksDB.java:1784) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.flush(RocksDBWriteBatchWrapper.java:116) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.flushIfNeeded(RocksDBWriteBatchWrapper.java:138) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.put(RocksDBWriteBatchWrapper.java:99) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:153) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.applyRestoreResult(RocksDBFullRestoreOperation.java:127) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:102) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:329) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:512) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:99) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) [flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) [flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.1.jar:1.17.1]
at java.lang.Thread.run(Unknown Source) [?:?]
1

There are 1 answers

0
David Anderson On

When you restart a job from a savepoint, the RocksDB state backend has to recreate the database from scratch, which involves recreating all of the SST files. Whereas when restarting from a checkpoint, the checkpoint has the necessary SST files, and Flink can just use them.