Why do Spark DataFrames not change their schema and what to do about it?

714 views Asked by At

I'm using Spark 2.1's Structured Streaming to read from a Kafka topic whose contents are binary avro-encoded.

Thus, after setting up the DataFrame:

val messages = spark
  .readStream
  .format("kafka")
  .options(kafkaConf)
  .option("subscribe", config.getString("kafka.topic"))
  .load()

If I print the schema of this DataFrame (messages.printSchema()), I get the following:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- timestampType: integer (nullable = true)

This question should be orthogonal to the problem of avro-decoding, but let's assume I want to somehow convert the value content from the messages DataFrame into a Dataset[BusinessObject], by a function Array[Byte] => BusinessObject. For example completeness, the function may just be (using avro4s):

case class BusinessObject(userId: String, eventId: String)

def fromAvro(bytes: Array[Byte]): BusinessObject =
    AvroInputStream.binary[BusinessObject](
        new ByteArrayInputStream(bytes)
    ).iterator.next

Of course, as miguno says in this related question I cannot just apply the transformation with a DataFrame.map(), because I need to provide an implicit Encoder for such a BusinessObject.

That can be defined as:

implicit val myEncoder : Encoder[BusinessObject] = org.apache.spark.sql.Encoders.kryo[BusinessObject]

Now, perform the map:

val transformedMessages : Dataset[BusinessObjecŧ] = messages.map(row => fromAvro(row.getAs[Array[Byte]]("value")))

But if I query the new schema, I get the following:

root
 |-- value: binary (nullable = true)

And I think that does not make any sense, as the dataset should use the Product properties of the BusinessObject case-class and get the correct values.

I've seen some examples on Spark SQL using .schema(StructType) in the reader, but I cannot do that, not just because I'm using readStream, but because I actually have to transform the column before being able to operate in such fields.

I am hoping to tell the Spark SQL engine that the transformedMessages Dataset schema is a StructField with the case class' fields.

1

There are 1 answers

2
zero323 On BEST ANSWER

I would say you get exactly what you ask for. As I already explained today Encoders.kryo generates a blob with serialized object. Its internal structure is opaque for the SQL engine and cannot be accessed without deserializing the object. So effectively what your code does is taking one serialization format and replacing it with another.

Another problem you have is that you try to mix dynamically typed DataFrame (Dataset[Row]) with statically typed object. Excluding UDT API Spark SQL doesn't work like this. Either you use statically Dataset or DataFrame with object structure encoded using struct hierarchy.

Good news is simple product types like BusinessObject should work just fine without any need for clumsy Encoders.kryo. Just skip Kryo encoder definition and be sure to import implicit encoders:

import spark.implicits._