I have a very basic and simple application where I have a topic1 and a topic2, a spark streaming application that reads from topic1 and another application that takes the outcome and send it to topic2.

Read:

import org.apache.spark.sql.{DataFrame, SparkSession}


object ReadKafkaTopic {
  def readStream(spark: SparkSession, brokers: String, topic: String): DataFrame = {
    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("failOnDataLoss", false)
      .option("subscribe", topic)
      .option("startingOffsets", "latest")
      .load()
  }
}

Write:

import org.apache.spark.sql.{DataFrame}


object WriteKafkaTopic {
  def writeStream(df: DataFrame, brokers: String, topic: String): Unit = {
    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("topic", topic)
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()
  }
}

This works wonderfully. Now I'm trying to "test" it with a local kafka popping for my tests (using net.manub.embeddedkafka.EmbeddedKafka)

And when I run my tests I get this error:

Cause: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.8

Thus, after reading on the net similar issues I passed all my jackson dependencies to 2.6.7 (after trying different options of course):

scalaVersion := "2.12.8"
val sparkVersion = "2.4.2"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.2.0"

// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion

// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion

// https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion % "provided"

// testing dependencies
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"

libraryDependencies += "io.github.embeddedkafka" %% "embedded-kafka" % "2.2.0" % "test"

// https://mvnrepository.com/artifact/org.apache.curator/curator-test
libraryDependencies += "org.apache.curator" % "curator-test" % "4.2.0" % Test

dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7"
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-annotations" % "2.6.7"
//
//// https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8
dependencyOverrides += "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.6.7"

But I keep running on the same error Incompatible Jackson version: 2.9.8

Did anyone faced this issue and know how to fix this?

0 Answers