I have a schema evolution case.
Detailed description:
Im laoding the source tables from datalake to bronze layer aas row data using autoloader and foreachbatch with function of merge into statemenet. which is having no problem to do the job.
While moving from bronze to sivler, as a source table I apply the select
statement to filter out extra columns when moving into silver layer.
I have the problem with only one table.
Table customeraddress
in the Bronze layer have the column MSFT_DATASTATE
which is not the case of the same table in silver layer. So i want to add this column autoamtically to my silver table.
# Enable autoMerge for schema evolution
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
p = re.compile('^BK_')
list_of_columns = dfUpdates.columns
list_of_BK_columns = [ s for s in dfUpdates.columns if p.match(s) ]
string = ''
for column in list_of_BK_columns:
string += f'table.{column} = newData.{column} and '
dictionary = {}
for key in list_of_columns:
dictionary[key] = f'newData.{key}'
# print("printing " + cdm + " columns")
print("We at this stage now -----------------------------------------------------")
# print(dfUpdates.columns)
deltaTable = DeltaTable.forPath(spark, f"abfss://silver@{storage_account}.dfs.core.windows.net/D365/{table.lower()}_ao")
deltaTable.alias('table') \
.merge(dfUpdates.alias("newData"), string) \
.whenMatchedUpdate(set=dictionary) \
.whenNotMatchedInsert(values=dictionary) \
.execute()
df.writeStream.foreachBatch(lambda df, epochId: update_changefeed(df, table, epochId)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()
Error Im getting states that:
SET column ``` not found given columns: [
PK_D365_customeraddress
,IsDelete
, etc]
which is correct, MSFT_DATASTATE
column is not in my silver deltatable.
Ref: https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html Try and use this: