I am running a stand alone spark, I have this code below related to EdgeRDD. These are graph edges loaded from a textfile. There are around 67 million records.
val edges: RDD[Edge[Int]] = edge_file.map(line => {val x = line.split("\\s+")
Edge(x(0).toLong, x(1).toLong, x(2).toInt); })
val edges1: EdgeRDD[Int] = EdgeRDD.fromEdges(edges)
println(edges1.count)
The issue is just counting them it gets stuck up on rdd creation. I have machine with 24gb of RAM. What should be the optimal settings for executers and drivers. Or do I need to set any additional configuration in spark-env.sh. I am running spark 1.4.0
spark-1.4.0-bin-hadoop2.6/bin/spark-submit --executor-memory 10g --driver-memory 10g --class "GraphParser" --master local[6] target/scala-2.10/simple-project_2.10-1.0.jar 100
Here is the output :
15/06/17 02:32:42 INFO SparkContext: Starting job: reduce at EdgeRDDImpl.scala:89
15/06/17 02:32:42 INFO DAGScheduler: Got job 1 (reduce at EdgeRDDImpl.scala:89) with 6 output partitions (allowLocal=false)
15/06/17 02:32:42 INFO DAGScheduler: Final stage: ResultStage 1(reduce at EdgeRDDImpl.scala:89)
15/06/17 02:32:42 INFO DAGScheduler: Parents of final stage: List()
15/06/17 02:32:42 INFO DAGScheduler: Missing parents: List()
15/06/17 02:32:42 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[11] at map at EdgeRDDImpl.scala:89), which has no missing parents
15/06/17 02:32:42 INFO MemoryStore: ensureFreeSpace(2904) called with curMem=507670, maxMem=8890959790
15/06/17 02:32:42 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 2.8 KB, free 8.3 GB)
15/06/17 02:32:42 INFO MemoryStore: ensureFreeSpace(1766) called with curMem=510574, maxMem=8890959790
15/06/17 02:32:42 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1766.0 B, free 8.3 GB)
15/06/17 02:32:42 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:55287 (size: 1766.0 B, free: 8.3 GB)
15/06/17 02:32:42 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:874
15/06/17 02:32:42 INFO DAGScheduler: Submitting 6 missing tasks from ResultStage 1 (MapPartitionsRDD[11] at map at EdgeRDDImpl.scala:89)
15/06/17 02:32:42 INFO TaskSchedulerImpl: Adding task set 1.0 with 6 tasks
15/06/17 02:32:42 INFO FairSchedulableBuilder: Added task set TaskSet_1 tasks to pool default
15/06/17 02:32:47 WARN TaskSetManager: Stage 1 contains a task of very large size (140947 KB). The maximum recommended task size is 100 KB.
15/06/17 02:32:47 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, PROCESS_LOCAL, 144329897 bytes)
15/06/17 02:32:53 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 145670467 bytes)
15/06/17 02:32:58 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, localhost, PROCESS_LOCAL, 145674593 bytes)
15/06/17 02:33:03 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 5, localhost, PROCESS_LOCAL, 145687533 bytes)
15/06/17 02:33:08 INFO TaskSetManager: Starting task 4.0 in stage 1.0 (TID 6, localhost, PROCESS_LOCAL, 145694247 bytes)
15/06/17 02:33:12 INFO TaskSetManager: Starting task 5.0 in stage 1.0 (TID 7, localhost, PROCESS_LOCAL, 145686985 bytes)
15/06/17 02:33:12 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
15/06/17 02:33:12 INFO Executor: Running task 2.0 in stage 1.0 (TID 4)
15/06/17 02:33:12 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
15/06/17 02:33:12 INFO Executor: Running task 5.0 in stage 1.0 (TID 7)
15/06/17 02:33:12 INFO Executor: Running task 4.0 in stage 1.0 (TID 6)
15/06/17 02:33:12 INFO Executor: Running task 3.0 in stage 1.0 (TID 5)
Aftergoing thru the log, figured that my task size is bigger and it takes time to schedule it. Spark itself warns this by saying.
That lead me to partition the data using code like below.
That fixed the issue. I got the results back in 45 seconds. Hope this will be useful to someone.