Spark session Null Pointer with Checkpointing

3.3k views Asked by At

I have enabled checkpoint that saves the logs to S3. If there are NO files in the checkpoint directory, spark streaming works fine and I can see log files appearing in the checkpoint directory. Then I kill spark streaming and restart it. This time, I start getting NullPointerException for spark session. In short, if there are NO log files in the checkpoint directory, spark streaming works fine. However as soon as I restart spark streaming WITH log files in the checkpoint directory, I start getting null pointer exception on spark session. Below is the code:

object asf {
  val microBatchInterval = 5
  val sparkSession = SparkSession
    .builder()
    .appName("Streaming")
    .getOrCreate()

    val conf = new SparkConf(true)
    //conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    val sparkContext = SparkContext.getOrCreate(conf)


  val checkpointDirectory = "s3a://bucketname/streaming-checkpoint"

  println("Spark session: " + sparkSession)

  val ssc = StreamingContext.getOrCreate(checkpointDirectory,
    () => {
      createStreamingContext(sparkContext, microBatchInterval, checkpointDirectory, sparkSession)
    }, s3Config.getConfig())

  ssc.start()
  ssc.awaitTermination()
}

  def createStreamingContext(sparkContext: SparkContext, microBatchInterval: Int, checkpointDirectory: String,spark:SparkSession): StreamingContext = {
    println("Spark session inside: " + spark)
    val ssc: org.apache.spark.streaming.StreamingContext = new StreamingContext(sparkContext, Seconds(microBatchInterval))
    //TODO: StorageLevel.MEMORY_AND_DISK_SER
    val lines = ssc.receiverStream(new EventHubClient(StorageLevel.MEMORY_AND_DISK_SER);
    lines.foreachRDD {
      rdd => {
        val df = spark.read.json(rdd)
        df.show()
      }
    }
    ssc.checkpoint(checkpointDirectory)
    ssc
  }
}  

And again, the very first time I run this code (with No log files in the checkpoint directory), I can see the data frame being printed out. And if I run with log files in the checkpoint directory, I don't even see

println("Spark session inside: " + spark)

getting printed and it IS printed the FIRST time. The error:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:111)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:109)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:549)
    at org.apache.spark.sql.SparkSession.read(SparkSession.scala:605)

And the error is happening at:

val df = spark.read.json(rdd)

Edit: I added this Line:

conf.set("spark.streaming.stopGracefullyOnShutdown","true")

and it still did not make a difference, still getting NullPointerException.

2

There are 2 answers

3
Ahmed On

To answer my own question, this works:

lines.foreachRDD {
  rdd => {
    val sqlContext:SQLContext = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate().sqlContext

    val df = sqlContext.read.json(rdd)
    df.show()
  }
}

Passing a spark session being built from rdd.sparkContext works

0
Jeevan On

Just to put it explicitly for the benefit of newbies, this is an anti-pattern. Creating Dataset inside a transformation is not allowed!

As Michel mentioned executor wont have access to SparkSession