SparkException: local class incompatible

6k views Asked by At

I'm trying to submit spark job from client to cloudera cluster. In cluster we are using CDH-5.3.2 and its spark version is 1.2.0 and hadoop version is 2.5.0. So to test our cluster we are submitting wordcount sample taken from spark web site. We can submit successfully our spark job which is written in java. However, we cannot write our result to file on hdfs. We got the following error,

20/06/25 09:38:16 INFO DAGScheduler: Job 0 failed: saveAsTextFile at SimpleWordCount.java:36, took 5.450531 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 8, obelix2): java.io.InvalidClassException: org.apache.spark.rdd.PairRDDFunctions; local class incompatible: stream classdesc serialVersionUID = 8789839749593513237, local class serialVersionUID = -4145741279224749316
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Here is our code sample

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class SimpleWordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("Simple Application");
        JavaSparkContext spark = new JavaSparkContext(conf);
        JavaRDD<String> textFile = spark.textFile("hdfs://obelix1:8022/user/U079681/deneme/example.txt");
        JavaRDD<String> words = textFile
                .flatMap(new FlatMapFunction<String, String>() {
                    public Iterable<String> call(String s) {
                        return Arrays.asList(s.split(" "));
                    }
                });
        JavaPairRDD<String, Integer> pairs = words
                .mapToPair(new PairFunction<String, String, Integer>() {
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                });
        JavaPairRDD<String, Integer> counts = pairs
                .reduceByKey(new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer a, Integer b) {
                        return a + b;
                    }
                });
//      System.out.println(counts.collect());
        counts.saveAsTextFile("hdfs://obelix1:8022/user/U079681/deneme/result");
    }
}

and Maven dependecies are

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.2.0-cdh5.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.5.0-mr1-cdh5.3.2</version>
        </dependency>

I have absolutely no idea where the error comes from, since from my understanding application's spark version and cloudera's spark version are same. Any idea would be more than welcome.

Note: We can see the result when we write to console.

2

There are 2 answers

1
deadlock89 On BEST ANSWER

After spending hours, we have solved the problem. Our problem's root cause is that we downloaded apache-spark from official site and builded it. So some jars are not competible with cloudera distributions. Finally we have learned today, spark cloudera distribution is available in github(https://github.com/cloudera/spark/tree/cdh5-1.2.0_5.3.2) and after building it we have saved the job result to hdfs.

0
Mikel Urkia On

As your note mentions, the application works correctly when the results are printed in the console, but gives errors whenever you try to save them in the underlying HDFS.

If I am not mistaken, this implies that:

  • When outputing the results into the console, Spark might not be using the underlying Hadoop infrastructure.

  • When saving the results in the HDFS, Spark does use the underlying Hadoop infrastructure.

These scenarios make me think that a Hadoop version mismatch is happening somewhere. Although the Spark version might match in both application and cluster nodes, there might exist a difference in the used Hadoop version.

You should take a look to the libraries that are being used in CDH-5.3.2 and check if all of them match with the ones used in your application.

Also, take a look to this question:

Standalone spark cluster. Can't submit job programmatically -> java.io.InvalidClassException