I'm trying to load data into Kafka using Spark structured streaming in Avro format. I have tried setting the default Confluent's serializer and deserializer (io.confluent.kafka.serializers.AbstractKafkaAvroSerDe
) and the documentation says we can use string or binary Serde's.
I'm trying to convert the JSON value to Avro byte stream and pass it to the value filed to push it to Kafka.
Please find below code for converting JSON to Avro.
def getAvro(row: String, schema: String): Array[scala.Byte] = {
val parser = new Schema.Parser
val schema1 = parser.parse(schema)
val byteinput = new ByteArrayInputStream(row.getBytes)
val din = new DataInputStream(byteinput)
val decoder = DecoderFactory.get().jsonDecoder(schema1, din)
val reader = new GenericDatumReader(schema1)
def test[T <: AnyRef](o: Option[T]): T = o getOrElse null.asInstanceOf[T]
val nullPass= test(None)
val datum = reader.read(test(None), decoder)
val outputStream = new ByteArrayOutputStream()
val e = EncoderFactory.get().binaryEncoder(outputStream, test(None))
new GenericDatumWriter(schema1).write(datum, e)
e.flush()
return outputStream.toByteArray()
}
When I try to run this I'm getting the following error :
java.lang.ClassCastException:
org.apache.avro.generic.GenericData$Record cannot be cast to
scala.runtime.Nothing$ at stream$.getAvro(stream.scala:36) at
stream$$anonfun$getAvro1$1$1.apply(stream.scala:71) at
stream$$anonfun$getAvro1$1$1.apply(stream.scala:71) ... 16 more
Any kind of help will be appreciated.