Persisting / Sharing a RDD in Spark Job Server

1.1k views Asked by At

I want a RDD from a spark job to be persisted, so that it can be used by all subsequent jobs using Spark Job Server. Here is what i have tried:

Job 1:

package spark.jobserver

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try

object FirstJob extends SparkJob with NamedRddSupport {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[4]").setAppName("FirstJob")
    val sc = new SparkContext(conf)
    val config = ConfigFactory.parseString("")
    val results = runJob(sc, config)
    println("Result is " + results)
  }

  override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid

  override def runJob(sc: SparkContext, config: Config): Any = {

    // the below variable is to be accessed by other jobs:
    val to_be_persisted : org.apache.spark.rdd.RDD[String] = sc.parallelize(Seq("some text"))

    this.namedRdds.update("resultsRDD", to_be_persisted)
    return to_be_persisted
  }
}

Job 2:

package spark.jobserver

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try


object NextJob extends SparkJob with NamedRddSupport {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[4]").setAppName("NextJob")
    val sc = new SparkContext(conf)
    val config = ConfigFactory.parseString("")
    val results = runJob(sc, config)
    println("Result is " + results)
  }

  override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid

  override def runJob(sc: SparkContext, config: Config): Any = {

    val rdd = this.namedRdds.get[(String, String)]("resultsRDD").get
    rdd
  }
}

The error i get is :

{
  "status": "ERROR",
  "result": {
    "message": "None.get",
    "errorClass": "java.util.NoSuchElementException",
    "stack": ["scala.None$.get(Option.scala:313)", "scala.None$.get(Option.scala:311)", "spark.jobserver.NextJob$.runJob(NextJob.scala:30)", "spark.jobserver.NextJob$.runJob(NextJob.scala:16)", "spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:278)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)", "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)", "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)", "java.lang.Thread.run(Thread.java:745)"]
  }

please modify the above code so that to_be_persisted is accessible. Thanks

Edit:

created spark context, after compiling and packaging scala sources using:

curl -d "" 'localhost:8090/contexts/test-context?num-cpu-cores=4&mem-per-node=512m'

calling FirstJob and NextJob using:

curl -d "" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.FirstJob&context=test-context&sync=true'

curl -d "" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.NextJob&context=test-context&sync=true'
1

There are 1 answers

1
Tzach Zohar On BEST ANSWER

There seem to be two issues here:

  1. If you're using latest spark-jobserver version (0.6.2-SNAPSHOT), there's an open bug about NamedObjects not working properly - seems to fit your description: https://github.com/spark-jobserver/spark-jobserver/issues/386.

  2. You also have a small type mismatch - in FirstJob you're persisting an RDD[String], and in NextJob you're trying to fetch an RDD[(String, String)] - in NextJob, should read val rdd = this.namedRdds.get[String]("resultsRDD").get).

I've tried your code with spark-jobserver version 0.6.0 and with the above-said small type fix, and it works.