Spark Kafka Streaming job failing due to InvalidClassException

1.4k views Asked by At

I'm running a streaming job in Spark 2, CDH 5.9 using Kafka client 0.8. The simple aim is to persist the information in Impala, record by record.

I can't get rid of this error, since I don't know from where it's coming from:

16/12/14 08:43:28 ERROR scheduler.JobScheduler: Error running job streaming
job 1481726608000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0
(TID 132, datanode1, executor 1):
java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat;
local class incompatible: stream classdesc serialVersionUID = 1, 
local class serialVersionUID = 2

The Direct Kafka Stream is created simply by

val streamingContext = new StreamingContext(spark.sparkContext, Seconds(2))
val kafkaParams = Map[String, String](
  "bootstrap.servers" -> "datanode1:9092,datanode2:9092,datanode3:9092",
  "group.id" -> "myconsumergroup",
  "auto.offset.reset" -> "largest")
val topics:Set[String] = Set("kafkatest")
val directKafkaStream  = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, topics)

And processed by:

val deviceMap = spark.read.parquet("/user/admin/temp/joinData.parquet").cache()

directKafkaStream.foreachRDD { rdd =>
  val avgData = spark.read.schema(jsonDatastruct).json(rdd.map(i => i._2)).select("data.*").as[JsonInfo]

  val deviceEnriched = avgData.join(deviceMap,Seq("COMMON_KEY"),"left")

    deviceEnriched.show(false)
    spark.sql("use my_database")
      deviceEnriched.write.mode("append").saveAsTable("tbl_persisted_kafka_stream")
}

streamingContext.start()
streamingContext.awaitTermination()
1

There are 1 answers

5
Samson Scharfrichter On BEST ANSWER

Short answer: the messages were serialized with a version of commons-lang3 JAR that is not compatible with the JAR you are using with Spark.

Long answer: if you had just Googled that error message, then searched the Apache Commons source code, you would have found...

  • this post that digs into the Java "class incompatible" serialization issue, in general
  • the source code for FastDateFormat stating that serialVersionUID = 1L until V3.1 but switching to serialVersionUID = 2L with V3.2 (because the binary structure has changed at that time)

By the way, I just checked and CDH 5.9 ships with commons-lang3 in V3.1 (for Hive, Impala, Sentry, Hive-in-Oozie, Sqoop-in-Oozie) and V3.3.2 (for Spark-in-Oozie) and V3.4 (for Sqoop) while Spark itself is not supposed to need it at all. Go figure.
And since CDH does not ship with Spark 2 yet, I guess you either downloaded the "beta" parcel or the Apache version -- and I checked, the Apache version (V2.0.2) ships with commons-lang3 V3.3.2

My 2 cents: just force --jars /opt/cloudera/parcels/CDH/jars/commons-lang3-3.1.jar in your Spark 2 command line, and see if that's enough to solve your issue.

Edit  For 2 extra cents, make sure that your "custom" JAR gets precedence over whatever JAR was already in the YARN Classpath, with --conf spark.yarn.user.classpath.first=true