Schema evolution with Autoloader

139 views Asked by At

I have a schema evolution case.

Detailed description:

Im laoding the source tables from datalake to bronze layer aas row data using autoloader and foreachbatch with function of merge into statemenet. which is having no problem to do the job.

While moving from bronze to sivler, as a source table I apply the select statement to filter out extra columns when moving into silver layer.

I have the problem with only one table.

Table customeraddress in the Bronze layer have the column MSFT_DATASTATE which is not the case of the same table in silver layer. So i want to add this column autoamtically to my silver table.

# Enable autoMerge for schema evolution
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

p = re.compile('^BK_')
list_of_columns = dfUpdates.columns
list_of_BK_columns = [ s for s in dfUpdates.columns if p.match(s) ]
string = ''
for column in list_of_BK_columns:
    string += f'table.{column} = newData.{column} and '
dictionary = {}
for key in list_of_columns:
    dictionary[key] = f'newData.{key}'

# print("printing " + cdm + " columns")
print("We at this stage now -----------------------------------------------------")
# print(dfUpdates.columns)

deltaTable = DeltaTable.forPath(spark, f"abfss://silver@{storage_account}.dfs.core.windows.net/D365/{table.lower()}_ao")
deltaTable.alias('table') \
    .merge(dfUpdates.alias("newData"), string) \
    .whenMatchedUpdate(set=dictionary) \
    .whenNotMatchedInsert(values=dictionary) \
    .execute()

df.writeStream.foreachBatch(lambda df, epochId: update_changefeed(df, table, epochId)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()

Error Im getting states that:

SET column ``` not found given columns: [PK_D365_customeraddress, IsDelete, etc]

which is correct, MSFT_DATASTATE column is not in my silver deltatable.

1

There are 1 answers

5
kuldeep ghosh On

Ref: https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html Try and use this:

# Add the mergeSchema option
loans.write.format("delta") \
           .option("mergeSchema", "true") \
           .mode("append") \
           .save(DELTALAKE_SILVER_PATH)