sparkSession.sql throwing NullPointerException

2.3k views Asked by At

I have two scala classes as part of my spark-sql job i.e. Driver.scala and ExtractorOne.scala.

Driver.scala passes different parameters like sparkSession object and etc to different extractors like ExtractorOne.scala and etc.

In Extractor classes I am extracting data from oracle and writing as parquet files in hdfs location.

As part of business logic I have to call sparkSession.sql() to perform some operations. But inside the extract() method of Extractor/calling class sparkSession is causing a Nullpointer exception ...so I tried to check it in the calling function by calling sparkSession.sql("show tables").show() it gives the results i.e. no issue with object. Where as when call the same i.e. sparkSession.sql("show tables").show() inside called function it is throwing Nullpointer exception ... any idea what am i doing wrong here?

'
Driver.scala

    val spark = ConfigUtils.getSparkSession( ...); //spark session initialization successful

      val parquetDf = spark.read.format("parquet"); // able to read parquet file data and got the dataframe.


      val extractors :  LinkedHashMap[String, (DataFrameReader, SparkSession, String, String,String,String) => Unit] = Utils.getAllDefinedExtractors(); 
      ///ExtractorOne.scala  ExtractorTwo.scala ..etc are extractors as shown in other scala file


      for ( key:String <- extractors.keys){

                extractors.get(key).map{

                    spark.sql("show tables").show()  ///output

                   fun => fun(ora_df_options_conf,spark,keyspace,key.trim(),"","")
             }
            }

'

Output of spark.sql("show tables").show() :::

 spark.sql("show tables").show()



> Blockquote

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

But samething gives error in ExtractorOne.scala

'

ExtractorOne.scala

def extract(oraOptionDfConfig: DataFrameReader, sparkSession: SparkSession, keyspace: String,
        columnFamilyName: String, fromDate:String , toDate:String ) : Unit ={

        val company_df  =  ..// some opeartion to read the data from oracle to company_df
        val  dist_df = company_df.distinct("id")

         company_df.createOrReplaceTempView("company")

         dist_df.foreach( row =>{

           if(row.anyNull){

           }else{


              val sqlQuery:String = s" select * from company  where id='%s' and quarter='%s' and year='%s' ".format( row.get(0) , row.get(1) , row.get(2))



              sparkSession.sql("show tables").show() ///output...

              var partitionDf = sparkSession.sql(sqlQuery)

              partitionDf.show(1)

               writeAsParquet(...) ///save as parquet file/s
           }


}

'

Output of sparkSession.sql("show tables").show() :::

ERROR :

'
Caused by: java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) at com.snp.extractors.CompanyModelValsExtractor$$anonfun$extract$1.apply(ExtractorOne.scala:126) at com.snp.extractors.CompanyModelValsExtractor$$anonfun$extract$1.apply(ExtractorOne.scala:113) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

  '  
1

There are 1 answers

7
Raphael Roth On BEST ANSWER

you cannot use the SparkSession in executor-side code (i.e. in a dist_df.foreach -loop), the Spark Session is null in this case (it only lives on the driver)