How to encode structs into Avro record in Spark?

250 views Asked by At

I'm trying to use to_avro() function to create Avro records. However, I'm not able to encode multiple columns, as some columns are simply lost after encoding. A simple example to recreate the problem:

val schema = StructType(List(
  StructField("entity_type", StringType),
  StructField("entity", StringType)
))
val rdd = sc.parallelize(Seq(
  Row("PERSON", "John Doe")
))
val df = sqlContext.createDataFrame(rdd, schema)

df
  .withColumn("struct", struct(col("entity_type"), col("entity")))
  .select("struct")
  .collect()
  .foreach(println)

// prints [[PERSON, John Doe]]
df
  .withColumn("struct", struct(col("entity_type"), col("entity")))
  .select(to_avro(col("struct")).as("value"))
  .select(from_avro(col("value"), entitySchema).as("entity"))
  .collect()
  .foreach(println)

// prints [[, PERSON]]

My schema looks like this

{
  "type" : "record",
  "name" : "Entity",
  "fields" : [ {
    "name" : "entity_type",
    "type" : "string"
  },
  {
    "name" : "entity",
    "type" : "string"
  } ]
}

What's interesting, is if I change the column order in the struct, the result would be [, John Doe]

I'm using Spark 2.4.5. According to Spark documentation: "to_avro() can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka."

1

There are 1 answers

0
Gorionovic On

It's working after changing field types from "string" to ["string", "null"]. Not sure if this behavior is intended though.