I'm using Spark 2.1's Structured Streaming to read from a Kafka topic whose contents are binary avro-encoded.
Thus, after setting up the DataFrame
:
val messages = spark
.readStream
.format("kafka")
.options(kafkaConf)
.option("subscribe", config.getString("kafka.topic"))
.load()
If I print the schema of this DataFrame
(messages.printSchema()
), I get the following:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: long (nullable = true)
|-- timestampType: integer (nullable = true)
This question should be orthogonal to the problem of avro-decoding, but let's assume I want to somehow convert the value
content from the messages DataFrame
into a Dataset[BusinessObject]
, by a function Array[Byte] => BusinessObject
. For example completeness, the function may just be (using avro4s):
case class BusinessObject(userId: String, eventId: String)
def fromAvro(bytes: Array[Byte]): BusinessObject =
AvroInputStream.binary[BusinessObject](
new ByteArrayInputStream(bytes)
).iterator.next
Of course, as miguno says in this related question I cannot just apply the transformation with a DataFrame.map()
, because I need to provide an implicit Encoder for such a BusinessObject
.
That can be defined as:
implicit val myEncoder : Encoder[BusinessObject] = org.apache.spark.sql.Encoders.kryo[BusinessObject]
Now, perform the map:
val transformedMessages : Dataset[BusinessObjecŧ] = messages.map(row => fromAvro(row.getAs[Array[Byte]]("value")))
But if I query the new schema, I get the following:
root
|-- value: binary (nullable = true)
And I think that does not make any sense, as the dataset should use the Product properties of the BusinessObject
case-class and get the correct values.
I've seen some examples on Spark SQL using .schema(StructType)
in the reader, but I cannot do that, not just because I'm using readStream
, but because I actually have to transform the column before being able to operate in such fields.
I am hoping to tell the Spark SQL engine that the transformedMessages
Dataset schema is a StructField
with the case class' fields.
I would say you get exactly what you ask for. As I already explained today
Encoders.kryo
generates ablob
with serialized object. Its internal structure is opaque for the SQL engine and cannot be accessed without deserializing the object. So effectively what your code does is taking one serialization format and replacing it with another.Another problem you have is that you try to mix dynamically typed
DataFrame
(Dataset[Row]
) with statically typed object. Excluding UDT API Spark SQL doesn't work like this. Either you use staticallyDataset
orDataFrame
with object structure encoded usingstruct
hierarchy.Good news is simple product types like
BusinessObject
should work just fine without any need for clumsyEncoders.kryo
. Just skip Kryo encoder definition and be sure to import implicit encoders: