Databricks Autoloader Schema Evolution throws StateSchemaNotCompatible exception

677 views Asked by At

I am trying to use Databricks Autoloader for a very simple use case:

Reading JSONs from S3 and loading them into a delta table, with schema inference and evolution.

This is my code:

self.spark \
      .readStream \
      .format("cloudFiles") \
      .option("cloudFiles.format", "json") \
      .option("cloudFiles.inferColumnTypes", "true") \
      .option("cloudFiles.schemaLocation", f"{self.target_s3_bucket}/_schema/{source_table_name}") \
      .load(f"{self.source_s3_bucket}/{source_table_name}") \
      .distinct() \
      .writeStream \
      .trigger(availableNow=True) \
      .format("delta") \
      .option("mergeSchema", "true") \
      .option("checkpointLocation", f"{self.target_s3_bucket}/_checkpoint/{source_table_name}") \
      .option("streamName", source_table_name) \
      .start(f"{self.target_s3_bucket}/{target_table_name}")

When a JSON with an unknown column arrives, the Stream fails, as expected, with a NEW_FIELDS_IN_RECORD_WITH_FILE_PATH exception.

But when I retry the job, I get the following exception:

StateSchemaNotCompatible: Provided schema doesn't match to the schema for existing state! Please note that Spark allow difference of field name: check count of fields and data type of each field.

This is my first time using Autoloader, am I doing something obviously wrong?

2

There are 2 answers

0
r3stle55 On BEST ANSWER

You are using .distinct() which creates a state for which changes in schema are not allowed. This is an expected behaviour, you may find more info here: https://docs.databricks.com/en/structured-streaming/query-recovery.html#:~:text=Changes%20in%20stateful%20operations

7
VonC On

The problem is on retry and probably a Spark Structured Streaming problem, not Autoloader. Additionally, I want to use schema inference and not provide it manually. I expect this to work by default since it is an elementary ETL use case.

In your case:

  • A StateSchemaNotCompatible exception typically occurs when there is a mismatch between the inferred schema in the current stream and the schema stored in the checkpoint or state store. It is possible that upon retrying the job, Spark is comparing the new inferred schema with the old schema stored in the checkpoint and finding inconsistencies.

  • Schema evolution with addNewColumns (illustrated here): Autoloader is expected to update the schema with new columns as they are detected. However, if the checkpoint data is not in sync with these updates, it might lead to the StateSchemaNotCompatible issue.


When you encounter a schema-related error, consider clearing the checkpoint data before retrying the stream. That allows Spark to start afresh with schema inference and evolution without being constrained by the previous state. (But it can lead to reprocessing of data).

The checkpoint data is stored in the location specified by the checkpointLocation option in your streaming query. In your case, it is f"{self.target_s3_bucket}/_checkpoint/{source_table_name}". You would need to access the S3 bucket where your checkpoint data is stored (through the AWS Management Console or using AWS CLI or SDKs). Try and delete the contents of the checkpoint directory. Make sure not to delete the directory itself, just its contents.

aws s3 rm s3://[your-target-s3-bucket]/_checkpoint/[source-table-name]/ --recursive

After clearing the checkpoint data, restart your streaming job. Spark Structured Streaming will treat this as a new query and reinitialize the checkpoint data from scratch.

Again, this is for testing, as clearing checkpoint data can lead to reprocessing of data which was already processed in the previous run. Make sure your system can handle this without causing data duplication or other issues. And... it might be prudent to back up the checkpoint data before deleting it, especially if you are dealing with critical data streams.

See also spark GroupBy throws StateSchemaNotCompatible exception with different "Existing key schema": The root cause of the problem was identified as being related to the state stored in the checkpoint directory. As the user noted, pointing to a new checkpoint directory solved the issue. It is a bit like clearing or changing the checkpoint location to resolve schema compatibility issues. In their case, the old state stored in the checkpoint directory was incompatible with the new schema, leading to the exception.


Another approach: instead of completely relying on schema inference, consider updating the schema incrementally as new fields are detected. That can be more manageable than dealing with significant schema changes at once. Instead of solely relying on Autoloader's schema inference and evolution, you would manually adjust the schema as new fields are detected.

Regularly check for new fields in your incoming data. That can be done by analyzing the data files in S3 or by inspecting the schema in the _schema directory set in cloudFiles.schemaLocation. When a new field is detected, manually update the schema. You can modify the schema definition in your Spark job to include the new fields. That can be done by modifying the StructType definition if you are using one, or by setting new schema hints. After updating the schema, restart your streaming job. With the new schema in place, Spark Structured Streaming should be able to handle the new fields without throwing the StateSchemaNotCompatible exception.

Suppose you have detected new fields newField1 and newField2 that need to be added.
Your StructType schema would be:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Existing schema definition
schema = StructType([
    StructField("existingField1", StringType(), True),
    StructField("existingField2", IntegerType(), True),
    # other existing fields 
])

# Add new fields
schema.add(StructField("newField1", StringType(), True))
schema.add(StructField("newField2", IntegerType(), True))

# Use the updated schema in your readStream
self.spark \
    .readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .schema(schema)  # Use the updated schema
    .option("cloudFiles.schemaLocation", f"{self.target_s3_bucket}/_schema/{source_table_name}") \
    .load(f"{self.source_s3_bucket}/{source_table_name}") \
    .distinct() \
    .writeStream \
    .trigger(availableNow=True) \
    .format("delta") \
    .option("mergeSchema", "true") \
    .option("checkpointLocation", f"{self.target_s3_bucket}/_checkpoint/{source_table_name}") \
    .option("streamName", source_table_name) \
    .start(f"{self.target_s3_bucket}/{target_table_name}")

As for the previous approach, this is for testing: it requires regular monitoring and manual intervention, which might not be feasible for streams with very frequent schema changes. And it would need a robust error handling to gracefully manage instances where the data does not match the expected schema.


Last approach: Look into the schema stored in _schema location after the first failure and compare it with the schema inferred during the retry. That might give insights into what specific differences are causing the issue. You would need to examine both the current schema and the schema stored from previous runs, typically found in the checkpoint or schema location.

Obtain the current schema by inspecting the structure of the new data files in S3 or the schema of the DataFrame just before writing to the stream. The previously used schema is stored in the _schema directory specified in your cloudFiles.schemaLocation.

Use tools or scripts to compare the current schema with the stored schema. Look for differences in field names, types, and the structure (like nested fields). Identify what changes between the schemas are causing the StateSchemaNotCompatible exception. Common issues include additional fields in the new data, changes in data types, or changes in the structure of nested fields.

Once you identify the inconsistencies, adjust your streaming job to handle these changes. That could mean updating the schema manually, as discussed earlier, or implementing a strategy to handle unexpected fields.

Adding logging in your Spark job would help capture schema information at various points. That makes it easier to trace back when and how the schema changes occurred.


The changes in the schema are apparent and also stated in the StateSchemaNotCompatible Exception. It is the unknown column that arrives.

I don't understand why the Structured Streaming state fails, I would expect Autoloader to handle this.

Autoloader is designed to handle schema evolution by updating the schema when new columns are detected. However, the state management within Spark Structured Streaming might still encounter issues if there is a mismatch between the existing state's schema and the newly inferred schema. That can happen if the state stored in the checkpoint directory is not in sync with these schema changes, as mentioned before.

In a typical scenario, when Autoloader detects a new column, it should fail the stream, update the schema, and then restart the stream with the new schema. If this process is not happening smoothly, it could be due because:

  • the checkpoint state might not be updating correctly to reflect the new schema.
  • of nuances in how Autoloader handles schema evolution, especially in the context of your specific data and Spark's version.
  • complex nested JSON data structures.

I can confirm that the only change is adding a column and not changing the type

That clarifies that the StateSchemaNotCompatible issue is only due to new columns, which should, in theory, be handled seamlessly by the Autoloader's addNewColumns mode.

Given this scenario, it is indeed unexpected that the problem seems to stem from the state management of Spark Structured Streaming rather than the Autoloader.

you might consider:

  • a specific issue in how Spark's state management handles schema updates, particularly in the context of schema evolution with new columns.

  • a version-specific behavior, specific to the version of Spark or Databricks you are using. Making sure you are on the latest version could help, as recent updates might have addressed such issues.

  • a synchronization issue between the schema evolution as managed by Autoloader and the state stored in the checkpoint. That could potentially be a bug or a limitation in the current system.