I am trying to read the kafka streaming data from spark streaming application; while in the process of reading data I am getting following exception:

16/12/24 11:09:05 INFO storage.BlockManagerMaster: Registered BlockManager

Exception in thread "main" java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder
    at com.inndata.RSVPSDataStreaming.KafkaToSparkStreaming.main(KafkaToSparkStreaming.java:69)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 10 more

Here is my versions info:

spark: 1.6.2

kafka: 0.8.2

Here is pom.xml:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.8.2.1</version>
</dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>1.6.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8-assembly_2.10</artifactId>
    <version>2.0.0-preview</version>
</dependency>
2

There are 2 answers

0
user7337271 On

You use incompatible and duplicated artifact versions. Remember that when using Spark:

  • all Scala artifacts have to use the same major Scala version (2.10, 2.11).
  • all Spark artifacts have to use the same major Spark version (1.6, 2.0).

In you build definition you mix spark-streaming 1.6 with spark-core 2.0 and you include duplicated spark-streaming-kafka for Scala 2.10 while remaining dependencies are for Scala 2.11.

1
Akash Sethi On

Seems like implicit String Encoder is needed Try to apply this

import org.apache.spark.sql.Encoder
    implicit val stringpEncoder = org.apache.spark.sql.Encoders.kryo[String]

you can find more about Encoders here and from official Document here