I am trying to use Databricks Autoloader for a very simple use case:
Reading JSONs from S3 and loading them into a delta table, with schema inference and evolution.
This is my code:
self.spark \
.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.inferColumnTypes", "true") \
.option("cloudFiles.schemaLocation", f"{self.target_s3_bucket}/_schema/{source_table_name}") \
.load(f"{self.source_s3_bucket}/{source_table_name}") \
.distinct() \
.writeStream \
.trigger(availableNow=True) \
.format("delta") \
.option("mergeSchema", "true") \
.option("checkpointLocation", f"{self.target_s3_bucket}/_checkpoint/{source_table_name}") \
.option("streamName", source_table_name) \
.start(f"{self.target_s3_bucket}/{target_table_name}")
When a JSON with an unknown column arrives, the Stream fails, as expected, with a NEW_FIELDS_IN_RECORD_WITH_FILE_PATH
exception.
But when I retry the job, I get the following exception:
StateSchemaNotCompatible: Provided schema doesn't match to the schema for existing state! Please note that Spark allow difference of field name: check count of fields and data type of each field.
This is my first time using Autoloader, am I doing something obviously wrong?
You are using
.distinct()
which creates a state for which changes in schema are not allowed. This is an expected behaviour, you may find more info here: https://docs.databricks.com/en/structured-streaming/query-recovery.html#:~:text=Changes%20in%20stateful%20operations