I'm trying to submit spark job from client to cloudera cluster. In cluster we are using CDH-5.3.2 and its spark version is 1.2.0 and hadoop version is 2.5.0. So to test our cluster we are submitting wordcount sample taken from spark web site. We can submit successfully our spark job which is written in java. However, we cannot write our result to file on hdfs. We got the following error,
20/06/25 09:38:16 INFO DAGScheduler: Job 0 failed: saveAsTextFile at SimpleWordCount.java:36, took 5.450531 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 8, obelix2): java.io.InvalidClassException: org.apache.spark.rdd.PairRDDFunctions; local class incompatible: stream classdesc serialVersionUID = 8789839749593513237, local class serialVersionUID = -4145741279224749316
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Here is our code sample
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class SimpleWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext spark = new JavaSparkContext(conf);
JavaRDD<String> textFile = spark.textFile("hdfs://obelix1:8022/user/U079681/deneme/example.txt");
JavaRDD<String> words = textFile
.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
return Arrays.asList(s.split(" "));
}
});
JavaPairRDD<String, Integer> pairs = words
.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = pairs
.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});
// System.out.println(counts.collect());
counts.saveAsTextFile("hdfs://obelix1:8022/user/U079681/deneme/result");
}
}
and Maven dependecies are
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.0-cdh5.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.0-mr1-cdh5.3.2</version>
</dependency>
I have absolutely no idea where the error comes from, since from my understanding application's spark version and cloudera's spark version are same. Any idea would be more than welcome.
Note: We can see the result when we write to console.
After spending hours, we have solved the problem. Our problem's root cause is that we downloaded apache-spark from official site and builded it. So some jars are not competible with cloudera distributions. Finally we have learned today, spark cloudera distribution is available in github(https://github.com/cloudera/spark/tree/cdh5-1.2.0_5.3.2) and after building it we have saved the job result to hdfs.