I set up a Flink cluster with the following configuration:
The number of task managers: 2
When I run the Connected Components algorithm (org.apache.flink.graph.library.ConnectedComponents
) on a graph with 4 million edges and 1,750,000 vertices with parallelism 2, I get this exception:
java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum: 87949312 bytes). at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108) at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:82) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum: 87949312 bytes). at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) Caused by: java.io.IOException: The record exceeds the maximum size of a sort buffer (current maximum: 87949312 bytes). at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:955) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
According to my configuration, I do not know why it says The record exceeds the maximum size of a sort buffer (current maximum:87949312 bytes)
while there is enough memory segments.
Is there anybody knows how to solve this problem?