Zeppelin 6.5 + Apache Kafka connector for Structured Streaming 2.0.2

2k views Asked by At

I'm trying to run a zeppelin notebook that contains spark's Structured Streaming example with Kafka connector.

>kafka is up and running on localhost port 9092 

>from zeppelin notebook, sc.version returns String = 2.0.2

Here is my environment:

kafka: kafka_2.10-0.10.1.0

zeppelin: zeppelin-0.6.2-bin-all

spark: spark-2.0.2-bin-hadoop2.7

Here is the code in my zeppelin notebook:

import org.apache.enter code herespark.sql.functions.{explode, split}


// Setup connection to Kafka val kafka = spark.readStream  
.format("kafka")   
.option("kafka.bootstrap.servers","localhost:9092")   
// comma separated list of broker:host  
.option("subscribe", "twitter")    
// comma separated list of topics 
.option("startingOffsets", "latest") 
// read data from the end of the stream   .load()

Here is the error I'm getting when I run the notebook:

import org.apache.spark.sql.functions.{explode, split} java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148) at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124) ... 86 elided Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) at scala.util.Try$.apply(Try.scala:192)

Any help advice would be greatly appreciated.

Thnx

2

There are 2 answers

0
hster On

You probably have figured this out already but putting in the answer for others, you have to add the following to zeppelin-env.sh.j2

SPARK_SUBMIT_OPTIONS=--packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0

along with potentially other dependencies if you are using the kafka client:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-sql_2.11:2.1.0,org.apache.kafka:kafka_2.11:0.10.0.1,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0,org.apache.kafka:kafka-clients:0.10.0.1
0
Gru97 On

This solution has been tested in zeppelin version 0.10.1.

You need to add dependencies of your code. It can be done with zeppelin UI. Go to Interpreter panel (http://localhost:8080/#/interpreter) and in spark section, under Dependencies you can add artifact of each dependency. If by adding spark-sql-kafka you ran into other dependency issues, add all packages the spark-sql-kafka needs. You can find them in Compile Dependencies section of it's maven repository.

I'm working with spark version 3.0.0 and scala version 2.12 and I was trying to integrate spark with kafka. I managed to get passed this issue by adding all the bellow artifacts:

org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0
    
com.google.code.findbugs:jsr305:3.0.0   

org.apache.spark:spark-tags_2.12:3.3.0  

org.apache.spark:spark-token-provider-kafka-0-10_2.12:3.3.0 

org.apache.kafka:kafka-clients:3.0.0