How can I explain the Apache Spark RDD Lineage Graph?

1.3k views Asked by At

I have few question with this below code:

val input1 = rawinput.map(_.split("\t")).map(x=>(x(6).trim(),x)).sortByKey()
val input2 = input1.map(x=> x._2.mkString("\t"))
val x0 = input2.map(_.split("\t")).map(x => (x(6),x(0))
val x1 = input2.map(_.split("\t")).map(x => (x(6),x(1))
val x2 = input2.map(_.split("\t")).map(x => (x(6),x(2))
val x3 = input2.map(_.split("\t")).map(x => (x(6),x(3))
val x4 = input2.map(_.split("\t")).map(x => (x(6),x(4))
val x5 = input2.map(_.split("\t")).map(x => (x(6),x(5))
val x6 = input2.map(_.split("\t")).map(x => (x(6),x(6))
val x = x0 union x1 union x2 union  x3 union x4 union x5 union x6


<pre>
**Lineage Graph:**
(7) UnionRDD[25] at union at rddCustUtil.scala:78 []
|  UnionRDD[24] at union at rddCustUtil.scala:78 []
|  UnionRDD[23] at union at rddCustUtil.scala:78 []
|  UnionRDD[22] at union at rddCustUtil.scala:78 []
|  UnionRDD[21] at union at rddCustUtil.scala:78 []
|  UnionRDD[20] at union at rddCustUtil.scala:78 []
|  MapPartitionsRDD[7] at map at rddCustUtil.scala:43 []
|  MapPartitionsRDD[6] at map at rddCustUtil.scala:43 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
|  MapPartitionsRDD[9] at map at rddCustUtil.scala:48 []
|  MapPartitionsRDD[8] at map at rddCustUtil.scala:48 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
|  MapPartitionsRDD[11] at map at rddCustUtil.scala:53 []
|  MapPartitionsRDD[10] at map at rddCustUtil.scala:53 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
|  MapPartitionsRDD[13] at map at rddCustUtil.scala:58 []
|  MapPartitionsRDD[12] at map at rddCustUtil.scala:58 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
|  MapPartitionsRDD[15] at map at rddCustUtil.scala:63 []
|  MapPartitionsRDD[14] at map at rddCustUtil.scala:63 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
|  MapPartitionsRDD[17] at map at rddCustUtil.scala:68 []
|  MapPartitionsRDD[16] at map at rddCustUtil.scala:68 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
|  MapPartitionsRDD[19] at map at rddCustUtil.scala:73 []
|  MapPartitionsRDD[18] at map at rddCustUtil.scala:73 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
</pre>
  • Can you please explain me how many shuffle stage will be executed as it is showing 7 ShuffledRDD[4]?
  • Can you please give me details explanation below DAG flow?
  • Is this operation expensive?
1

There are 1 answers

9
Tzach Zohar On BEST ANSWER

how many shuffle stage will be executed

Indeed, the shuffle required to sort your data happens 7 times, because Spark's evaluation is lazy and runs on-demand, and unless cached it will be recomputed for every branch in the DAG that requires it. To solve this (and make this calculation much faster, probably), you can cache (or, more generally, persist) input2 before you use it multiple times:

val input1 = rawinput.map(_.split("\t")).map(x=>(x(6).trim(),x)).sortByKey()
val input2 = input1.map(x=> x._2.mkString("\t")).cache()
// continue as before

Can you please give me details explanation below DAG flow

Each of your x_ RDDs is calculated "separately" using the following calculation:

+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
|  MapPartitionsRDD[9] at map at rddCustUtil.scala:48 []
|  MapPartitionsRDD[8] at map at rddCustUtil.scala:48 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []

Which shows the calculation that created rawinput from textFile, and then the sorting and the three map operations.

Then, you have 6 union operations unioining these 7 RDDs.

Is this operation expensive?

YES, it seems like it would be. As suggested above, caching can make it much faster - but there's a better way to achieve this - without splitting the RDD into many separate ones:

val x = rawinput.map(_.split("\t"))
  .keyBy(_(6).trim()) // extract key
  .flatMap{ case (k, arr) => arr.take(7).zipWithIndex.map((k, _)) } // flatMap into (key, (value, index))
  .sortBy { case (k, (_, index)) => (index, k) } // sort by index first, key second
  .map    { case (k, (value, _)) => (k, value) } // remove index, it was just used for sorting

This will perform a single shuffle operation, and won't require persisting the data. The DAG would look like this:

(4) MapPartitionsRDD[9] at map at Test.scala:75 []
 |  MapPartitionsRDD[8] at sortBy at Test.scala:74 []
 |  ShuffledRDD[7] at sortBy at Test.scala:74 []
 +-(4) MapPartitionsRDD[4] at sortBy at Test.scala:74 []
    |  MapPartitionsRDD[3] at flatMap at Test.scala:73 []
    |  MapPartitionsRDD[2] at keyBy at Test.scala:72 []
    |  MapPartitionsRDD[1] at map at Test.scala:71 []
    |  ParallelCollectionRDD[0] at parallelize at Test.scala:64 []