PySpark error: java.lang.OutOfMemoryError: GC overhead limit exceeded

62 views Asked by At

I'm running PySpark application on local mode, with driver-memory set to 14g (installed RAM is 16g)

I have two dataframes, ve (227 kb, 17,384 row), and e (2671 kb, 139,159 row)

ve = spark.read.options(header='True', delimiter=',').csv(r"vertix.csv")
e = spark.read.options(header='True', delimiter=',').csv(r"edges.csv")

I created a graphframe, and looped through the vertices (17,384 element) to calculate bfs. the results should be appended to f_df dataframe to be used later.

f_df = spark.createDataFrame([], schm)

#create graphframe
g = GraphFrame(ve, e)

v_list = ve.dropDuplicates(['id']).select(collect_list('id')).first()[0] #list of vertices

for i in v_list:
  bfs_paths = g.bfs('id = "7273"', 'id = "'+i+'"', maxPathLength=15) # loop destination
  if len(bfs_paths.columns) > 2:
      paths = bfs_paths.withColumn("path",concat(array(*bfs_paths.columns[1::2]))).select('path')
      f_df = f_df.union(paths)

The error appear after ~400 loop. and I'm not sure how to fix this problem. since I need a dataframe that contains all bfs from that certain point.

Error:

py4j.protocol.Py4JJavaError: An error occurred while calling o11452.run.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 25555.0 failed 1 times, most recent failure: Lost task 7.0 in stage 25555.0 (TID 20280) (DESKTOP-DQERUUR executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.util.LinkedList.linkLast(LinkedList.java:142)
        at java.util.LinkedList.add(LinkedList.java:338)
        at org.apache.spark.sql.execution.BufferedRowIterator.append(BufferedRowIterator.java:73)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage32.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2486/867138882.apply(Unknown Source)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.util.LinkedList.linkLast(LinkedList.java:142)
        at java.util.LinkedList.add(LinkedList.java:338)
        at org.apache.spark.sql.execution.BufferedRowIterator.append(BufferedRowIterator.java:73)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage32.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2486/867138882.apply(Unknown Source)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
0

There are 0 answers