I'm trying to run a sample like StructuredKafkaWordCount. I started with the Spark Structured Streaming Programming guide.

My code is

package io.boontadata.spark.job1

import org.apache.spark.sql.SparkSession

object DirectKafkaAggregateEvents {
  val FIELD_MESSAGE_ID = 0
  val FIELD_DEVICE_ID = 1
  val FIELD_TIMESTAMP = 2
  val FIELD_CATEGORY = 3
  val FIELD_MEASURE1 = 4
  val FIELD_MEASURE2 = 5

  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(s"""
        |Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <subscribeType> sample value: subscribe
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    val Array(bootstrapServers, subscribeType, topics) = args

    val spark = SparkSession
      .builder
      .appName("boontadata-spark-job1")
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of input lines from kafka
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

    // Generate running word count
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

    // Start running the query that prints the running counts to the console
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }

}

I added the following sbt files:

build.sbt:

name := "boontadata-spark-job1"
version := "0.1"
scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1"

// META-INF discarding
assemblyMergeStrategy in assembly := { 
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}

I also added project/assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

This creates a Uber jar with the non provided jars.

I submit with the following line:

spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic

but I get this runtime error:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
        at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41)
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
        ... 18 more
16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook

Is there a way to know which class is not found so that I can search the maven.org repo for that class.

The lookupDataSource source code seems to be at line 543 at https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala but I couldn't find a direct link with Kafka data source...

Complete source code is here: https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f

8

There are 8 answers

3
Sree Eedupuganti On BEST ANSWER

I tried like this it's working for me. Submit like this and let me know once you have any issues

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar
1
ssice On

In my case I also got this error while compiling with sbt, and the cause was that sbt assembly was not including the spark-sql-kafka-0-10_2.11 artifact as part of the fat jar.

(I would be very welcome to comments here. The dependency was not specified a scope, so it should not be assumed to be "provided").

So I changed to deploying a normal (slim) jar and including the dependencies with the --jars parameters to spark-submit.

In order to gather all dependencies in one place, you can add retrieveManaged := true to your sbt project settings, or you can, in the sbt console, issue:

> set retrieveManaged := true
> package

That should bring all dependencies to the lib_managed folder.

Then you can copy all those files (with a bash command you can for example use something like this

cd /path/to/your/project

JARLIST=$(find lib_managed -name '*.jar'| paste -sd , -)

spark-submit [other-args] target/your-app-1.0-SNAPSHOT.jar --jars "$JARLIST"
0
Raghav On

I solved it by downloading the jar file to the driver system. From there, I supplied the jar to spark submit with --jar option.

Also to be noted is that i was packaging the whole spark 2.1 environment in my uber jar (since my cluster is still on 1.6.1) For some reason, its not picked up when included in uber jar.

spark-submit --jar /ur/path/spark-sql-kafka-0-10_2.11:2.1.0 --class ClassNm --Other-Options YourJar.jar

0
dalin qin On

I'm using spark 2.1 and facing the very same problem my workaround is

1) spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

2) cd ~/.ivy2/jars here you are ,all the needed jars are in this folder now

3) copy all the jars in this folder to all the nodes(can create a specific folder holding them)

4) add the folder name to spark.driver.extraClassPath and spark.driver.extraClassPath ,e.g. spark.driver.extraClassPath=/opt/jars/*:your_other_jars

5 spark-submit --class ClassNm --Other-Options YourJar.jar works fine now

5
Jacek Laskowski On

The issue is the following section in build.sbt:

// META-INF discarding
assemblyMergeStrategy in assembly := { 
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}

It says that all META-INF entires should be discarded, including the "code" that makes data source aliases (e.g. kafka) work.

But the META-INF files are very important for kafka (and other aliases of streaming data sources) to work.

For kafka alias to work Spark SQL uses META-INF/services/org.apache.spark.sql.sources.DataSourceRegister with the following entry:

org.apache.spark.sql.kafka010.KafkaSourceProvider

KafkaSourceProvider is responsible to register kafka alias with the proper streaming data source, i.e. KafkaSource.

Just to check that the real code is indeed available, but the "code" that makes the alias registered is not, you could use the kafka data source by the fully-qualified name (not the alias) as follows:

spark.readStream.
  format("org.apache.spark.sql.kafka010.KafkaSourceProvider").
  load

You will see other problems due to missing options like kafka.bootstrap.servers, but...we're digressing.

A solution is to MergeStrategy.concat all META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (that would create an uber-jar with all data sources, incl. the kafka data source).

case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
0
Algomeister On

This is in view of Jacek Laskowski's answer.

Those of you building your project on maven can try this out. Add the line mentioned below to your maven-shade-plugin.

META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

I've put down the plugin code for the pom file as an example to show where to add the line.


<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>3.1.0</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>
                            META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
                        </resource>
                    </transformer>
                </transformers>
                <finalName>${project.artifactId}-${project.version}-uber</finalName>
            </configuration>
        </execution>
    </executions>
</plugin>

Please excuse my formatting skills.

0
Falco Winkler On

I am using gradle as a build tool and the shadowJar plugin to create the uberJar. The solution was simply to add a File

src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister  

to the project.

In this file you need to put, line by line, the class names of the DataSources you use, in this case it would be org.apache.spark.sql.kafka010.KafkaSourceProvider (find that class name for example here)

The reason is that Spark uses the java ServiceLoader in it's internal dependency management mechanisms.

Full example here.

0
Abdul Mannan On

Although this is an old thread, I faced this issue with Pyspark 2.3.3 on Hortonworks 3.1.5 so I thought maybe it can help others. The following jars are required for spark streaming integration with Kafka 2.

Note: Please download appropriate jars according to Spark & Kafka's version.