I've got a spark program that essentially does this:
def foo(a: RDD[...], b: RDD[...]) = {
val c = a.map(...)
c.persist(StorageLevel.MEMORY_ONLY_SER)
var current = b
for (_ <- 1 to 10) {
val next = some_other_rdd_ops(c, current)
next.persist(StorageLevel.MEMORY_ONLY)
current.unpersist()
current = next
}
current.saveAsTextFile(...)
}
The strange behavior that I'm seeing is that spark stages corresponding to val c = a.map(...)
are happening 10 times. I would have expected that to happen only once because of the immediate caching on the next line, but that's not the case. When I look in the "storage" tab of the running job, very few of the partitions of c are cached.
Also, 10 copies of that stage immediately show as "active". 10 copies of the stage corresponding to val next = some_other_rdd_ops(c, current)
show up as pending, and they roughly alternate execution.
Am I misunderstanding how to get Spark to cache RDDs?
Edit: here is a gist containing a program to reproduce this: https://gist.github.com/jfkelley/f407c7750a086cdb059c. It expects as input the edge list of a graph (with edge weights). For example:
a b 1000.0
a c 1000.0
b c 1000.0
d e 1000.0
d f 1000.0
e f 1000.0
g h 1000.0
h i 1000.0
g i 1000.0
d g 400.0
Lines 31-42 of the gist correspond to the simplified version above. I get 10 stages corresponding to line 31 when I would only expect 1.
Caching doesn't reduce stages, it just won't recompute the stage every time.
In the first iteration, in the stage's "Input Size" you can see that the data is coming from Hadoop, and that it reads shuffle input. In subsequent iterations, the data is coming from memory and no more shuffle input. Also, execution time is vastly reduced.
New map stages are created whenever shuffles have to be written, for example when there's a change in partitioning, in your case adding a key to the RDD.