Flink Gelly Memory ran out

290 views Asked by At

I run a cluster with the following specifications (per each task manager): - 16 cpu threads - 16 GB ram - 16 slots. I have two task managers and when we run a graph algorithm such as the connected components, the program would be failed with the following error:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 29 maxPartition: 30 number of overflow segments: 105 bucketSize: 234 Overall memory: 42598400 Partition memory: 30539776 Message: null at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816) at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) at org.apache.flink.api.java.DataSet.print(DataSet.java:1652) at com.asha.adw.ga.gpe.main.Main.main(Main.java:207) Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 29 maxPartition: 30 number of overflow segments: 105 bucketSize: 234 Overall memory: 42598400 Partition memory: 30539776 Message: null at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:457) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:405) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:316) at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:228) at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:291) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748)

I monitored the heap and its utilization is about 40%. I also set off-heap true, but no such luck. When increase the main memory up 64 GB, it successfully run. Any help would be highly appreciated.

1

There are 1 answers

0
Janukowitsch On

Your exception indicates, that you ran out of Flink managed memory. You can control the fraction of the total availbale memory used as managed memory via taskmanager.memory.fraction. The default is 0.7 and means that ~0.7 of the available heap space (determined by taskmanager.heap.mb) is used as managed memory. So, you could try to increase this value. The remeining frction of ~0.3 is mainly used for user defined functions.

I just had the same problem and found the reasoning here: Gelly ran out of memory

I was able to validate the response in my project.

Hope the answer is not to late!