When I run code such as the following:
val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
newRDD.checkpoint
print(newRDD.count())
and watch the stages in Yarn, I notice that Spark is doing the DAG calculation TWICE -- once for the distinct+count that materializes the RDD and caches it, and then a completely SECOND time to created the checkpointed copy.
Since the RDD is already materialized and cached, why doesn't the checkpointing simply take advantage of this, and save the cached partitions to disk?
Is there an existing way (some kind of configuration setting or code change) to force Spark to take advantage of this and only run the operation ONCE, and checkpointing will just copy things?
Do I need to "materialize" twice, instead?
val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(newRDD.count())
newRDD.checkpoint
print(newRDD.count())
I've created an Apache Spark Jira ticket to make this a feature request: https://issues.apache.org/jira/browse/SPARK-8666
Looks like this may be a known issue. See an older JIRA ticket, https://issues.apache.org/jira/browse/SPARK-8582