I'm running PySpark application on local mode, with driver-memory set to 14g (installed RAM is 16g)
I have two dataframes, ve (227 kb, 17,384 row), and e (2671 kb, 139,159 row)
ve = spark.read.options(header='True', delimiter=',').csv(r"vertix.csv")
e = spark.read.options(header='True', delimiter=',').csv(r"edges.csv")
I created a graphframe, and looped through the vertices (17,384 element) to calculate bfs. the results should be appended to f_df dataframe to be used later.
f_df = spark.createDataFrame([], schm)
#create graphframe
g = GraphFrame(ve, e)
v_list = ve.dropDuplicates(['id']).select(collect_list('id')).first()[0] #list of vertices
for i in v_list:
bfs_paths = g.bfs('id = "7273"', 'id = "'+i+'"', maxPathLength=15) # loop destination
if len(bfs_paths.columns) > 2:
paths = bfs_paths.withColumn("path",concat(array(*bfs_paths.columns[1::2]))).select('path')
f_df = f_df.union(paths)
The error appear after ~400 loop. and I'm not sure how to fix this problem. since I need a dataframe that contains all bfs from that certain point.
Error:
py4j.protocol.Py4JJavaError: An error occurred while calling o11452.run.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 25555.0 failed 1 times, most recent failure: Lost task 7.0 in stage 25555.0 (TID 20280) (DESKTOP-DQERUUR executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.LinkedList.linkLast(LinkedList.java:142)
at java.util.LinkedList.add(LinkedList.java:338)
at org.apache.spark.sql.execution.BufferedRowIterator.append(BufferedRowIterator.java:73)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage32.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2486/867138882.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.LinkedList.linkLast(LinkedList.java:142)
at java.util.LinkedList.add(LinkedList.java:338)
at org.apache.spark.sql.execution.BufferedRowIterator.append(BufferedRowIterator.java:73)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage32.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2486/867138882.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)