My delta table has some problem to update the schema and infer the new column in it. The problem occurs with only one table. Rest of the table is working fine. Below code is dynamic so there is no difference how I run for different tables.
Detailed description:
Step 1.
Reading the dataframe and running the other notebook to generate the SQL statement as a string. Also enabling the automerge schema
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window
# Enable autoMerge for schema evolution
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "True")
query = dbutils.notebook.run(
f"/bronze views/VW_D365_{table.upper()}",
3600,
{
"append_only_mode": "yes",
"incremental": "yes",
"catalog": catalog,
"schema": schema,
},
)
df = (
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.table(f"{catalog}.bronze.d365_{table.lower()}_ao")
)
Step 2
Then I define the def
function which will be passed when starting to write the data into the target table.
def update_changefeed(df, table, query, epochId):
# Doing some transforamtion and picking up neccessary data
filtered_df = df.filter(
col("_change_type").isin("insert", "update_postimage", "delete")
)
filtered_df = filtered_df.drop(
"_commit_timestamp", "_change_type", "_commit_version"
)
w = Window.partitionBy("id").orderBy(F.col("modifiedon").desc())
filtered_df = (
filtered_df.withWatermark("modifiedon", "1 second")
.withColumn("rn", F.row_number().over(w))
.where(F.col("rn") == 1)
.drop("rn")
)
# Creating the global temp view on top of the dataframe in order to apply the select statement later
filtered_df.createOrReplaceGlobalTempView(f"tmp_D365_{table}")
# "query" refers to the output of the nootebook I run in the first step
dfUpdates = sqlContext.sql(query)
dfUpdates.columns
# Below entire process is to collect the column names from source and target table to pass it in the merge function.
# we will be merging on columns which contain BK in their names.
p = re.compile("^BK_")
list_of_columns = dfUpdates.columns
list_of_BK_columns = [s for s in dfUpdates.columns if p.match(s)]
string = ""
for column in list_of_BK_columns:
string += f"table.{column} = newData.{column} and "
string_insert = ""
for column in list_of_BK_columns:
string_insert += f"table.{column} = newData.{column} and "
string_insert[:-4]
dictionary = {}
for key in list_of_columns:
dictionary[key] = f"newData.{key}"
# Executing the merge function itself
deltaTable = DeltaTable.forPath(
spark,
f"abfss://silver@{storage_account}.dfs.core.windows.net/D365/{table.lower()}_ao",
)
deltaTable.alias("table").merge(
dfUpdates.alias("newData"), string
).whenMatchedUpdate(set=dictionary).whenNotMatchedInsert(
values=dictionary
).execute()
Step 3.
writing part
df.writeStream.foreachBatch(
lambda df, epochId: update_changefeed(df, table, query, epochId)
).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()
Error Im getting is
SET column `MSFT_DATASTATE` not found given columns: [`PK_D365_customeraddress`, `IsDelete` etc]
Indeed MSFT_DATASTATE
column is not in my target delta table and is supposed to merge it there. Im not sure where it goes wrong.
Maybe worth to mention that before enabling spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "True")
I was getting the following error
cannot resolve MSFT_DATASTATE in UPDATE clause given columns ...
try this out :