Unexpected spark caching behavior

388 views Asked by At

I've got a spark program that essentially does this:

def foo(a: RDD[...], b: RDD[...]) = {
  val c = a.map(...)
  c.persist(StorageLevel.MEMORY_ONLY_SER)
  var current = b
  for (_ <- 1 to 10) {
    val next = some_other_rdd_ops(c, current)
    next.persist(StorageLevel.MEMORY_ONLY)
    current.unpersist()
    current = next
  }
  current.saveAsTextFile(...)
}

The strange behavior that I'm seeing is that spark stages corresponding to val c = a.map(...) are happening 10 times. I would have expected that to happen only once because of the immediate caching on the next line, but that's not the case. When I look in the "storage" tab of the running job, very few of the partitions of c are cached.

Also, 10 copies of that stage immediately show as "active". 10 copies of the stage corresponding to val next = some_other_rdd_ops(c, current) show up as pending, and they roughly alternate execution.

Am I misunderstanding how to get Spark to cache RDDs?

Edit: here is a gist containing a program to reproduce this: https://gist.github.com/jfkelley/f407c7750a086cdb059c. It expects as input the edge list of a graph (with edge weights). For example:

a   b   1000.0
a   c   1000.0
b   c   1000.0
d   e   1000.0
d   f   1000.0
e   f   1000.0
g   h   1000.0
h   i   1000.0
g   i   1000.0
d   g   400.0

Lines 31-42 of the gist correspond to the simplified version above. I get 10 stages corresponding to line 31 when I would only expect 1.

2

There are 2 answers

0
Marius Soutier On

Caching doesn't reduce stages, it just won't recompute the stage every time.

In the first iteration, in the stage's "Input Size" you can see that the data is coming from Hadoop, and that it reads shuffle input. In subsequent iterations, the data is coming from memory and no more shuffle input. Also, execution time is vastly reduced.

New map stages are created whenever shuffles have to be written, for example when there's a change in partitioning, in your case adding a key to the RDD.

0
Michael Mior On

The problem here is that calling cache is lazy. Nothing will be cached until an action is triggered and the RDD is evaluated. All the call does is set a flag in the RDD to indicate that it should be cached when evaluated.

Unpersist however, takes effect immediately. It clears the flag indicating that the RDD should be cached and also begins a purge of data from the cache. Since you only have a single action at the end of your application, this means that by the time any of the RDDs are evaluated, Spark does not see that any of them should be persisted!

I agree that this is surprising behaviour. The way that some Spark libraries (including the PageRank implementation in GraphX) work around this is by explicitly materializing each RDD between the calls to cache and unpersist. For example, in your case you could do the following:

def foo(a: RDD[...], b: RDD[...]) = {
  val c = a.map(...)
  c.persist(StorageLevel.MEMORY_ONLY_SER)
  var current = b
  for (_ <- 1 to 10) {
    val next = some_other_rdd_ops(c, current)
    next.persist(StorageLevel.MEMORY_ONLY)
    next.foreachPartition(x => {}) // materialize before unpersisting
    current.unpersist()
    current = next
  }
  current.saveAsTextFile(...)
}