I'm trying to use to_avro()
function to create Avro records. However, I'm not able to encode multiple columns, as some columns are simply lost after encoding. A simple example to recreate the problem:
val schema = StructType(List(
StructField("entity_type", StringType),
StructField("entity", StringType)
))
val rdd = sc.parallelize(Seq(
Row("PERSON", "John Doe")
))
val df = sqlContext.createDataFrame(rdd, schema)
df
.withColumn("struct", struct(col("entity_type"), col("entity")))
.select("struct")
.collect()
.foreach(println)
// prints [[PERSON, John Doe]]
df
.withColumn("struct", struct(col("entity_type"), col("entity")))
.select(to_avro(col("struct")).as("value"))
.select(from_avro(col("value"), entitySchema).as("entity"))
.collect()
.foreach(println)
// prints [[, PERSON]]
My schema looks like this
{
"type" : "record",
"name" : "Entity",
"fields" : [ {
"name" : "entity_type",
"type" : "string"
},
{
"name" : "entity",
"type" : "string"
} ]
}
What's interesting, is if I change the column order in the struct, the result would be [, John Doe]
I'm using Spark 2.4.5. According to Spark documentation: "to_avro() can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka."
It's working after changing field types from
"string"
to["string", "null"]
. Not sure if this behavior is intended though.