Spark RDD checkpoint on persisted/cached RDDs are performing the DAG twice

4.1k views Asked by At

When I run code such as the following:

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
newRDD.checkpoint
print(newRDD.count())

and watch the stages in Yarn, I notice that Spark is doing the DAG calculation TWICE -- once for the distinct+count that materializes the RDD and caches it, and then a completely SECOND time to created the checkpointed copy.

Since the RDD is already materialized and cached, why doesn't the checkpointing simply take advantage of this, and save the cached partitions to disk?

Is there an existing way (some kind of configuration setting or code change) to force Spark to take advantage of this and only run the operation ONCE, and checkpointing will just copy things?

Do I need to "materialize" twice, instead?

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(newRDD.count())

newRDD.checkpoint
print(newRDD.count())

I've created an Apache Spark Jira ticket to make this a feature request: https://issues.apache.org/jira/browse/SPARK-8666

3

There are 3 answers

0
Glenn Strycker On BEST ANSWER

Looks like this may be a known issue. See an older JIRA ticket, https://issues.apache.org/jira/browse/SPARK-8582

0
David Beavon On

This is an old question. But it affected me as well so I did some digging. I found a bunch of very unhelpful search results within the change-tracking history for jira and github. These search results contained a lot of tech-babble from the developers about their proposed programming changes. That didn't end up being very informative for me, and I would suggest limiting the amount of time you spend looking at it.

The clearest information I could find on the matter is here: https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md

An RDD which needs to be checkpointed will be computed twice; thus it is suggested to do a rdd.cache() before rdd.checkpoint()

Given that the OP actually did use persist and checkpoint, he was probably on the right track. I suspect the only problem was in the way he invoked checkpoint. I'm fairly new to spark but I think he should have done it like so:

newRDD = newRDD.checkpoint

Hope this is clear. Based on my testing, this eliminates the redundant recomputation of one of my dataframes.

0
xiaoxinganling On

The data you cached may be evicted due to the lack of memory, and you can open the Spark UI to check whether it's true.