Unable to convert avro data back to spark dataframe

51 views Asked by At

I have a dataframe as such

raw_df.show()
raw_df.printSchema()
+---+
| id|
+---+
|  a|
|  b|
|  c|
+---+

root
 |-- id: string (nullable = true)

I am able to convert this dataframe to avro using spark-avro's to_avro function with the following schema like so.

schema_v0 = """
{
  "type": "record",
  "name": "Value",
  "namespace": "bets",
  "fields": [
    {
      "name": "id",
      "type": "string"
    }
  ],
  "connect.name": "bets.Value"
}
"""
encoded_df = raw_df.select(to_avro(F.struct(*raw_df.columns), 
                                   jsonFormatSchema=schema_v0
                                  ).alias("avro_encoded_value"))
encoded_df.show()
+------------------+
|avro_encoded_value|
+------------------+
|           [02 61]|
|           [02 62]|
|           [02 63]|
+------------------+

However, when I try to convert this avro encoded data back to a spark dataframe using another schema with an optional+nullable column called redeemed, I am getting a org.apache.spark.SparkException: Malformed records are detected in record parsing.

schema_v1 = """
{
  "type": "record",
  "name": "Value",
  "namespace": "bets",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "redeemed",
      "type": ["boolean", "null"],
      "default": false
    }
  ],
  "connect.name": "bets.Value"
}
"""
decoded_df = encoded_df.select(from_avro("avro_encoded_value", schema_v1).alias("avro_decoded_value")).select("avro_decoded_value.*")
decoded_df.show()
decoded_df.printSchema()

As I understand, the default value should be used when decoding and used to fill in the default value if it is absent in the encoded data correct? Why is this not happening? For reference, I am using Spark 3.4.1 & org.apache.spark:spark-avro_2.12:3.4.1

0

There are 0 answers