Delta merge doesn't update schema - autoMerge.enabled

360 views Asked by At

My delta table has some problem to update the schema and infer the new column in it. The problem occurs with only one table. Rest of the table is working fine. Below code is dynamic so there is no difference how I run for different tables.

Detailed description:

Step 1.

Reading the dataframe and running the other notebook to generate the SQL statement as a string. Also enabling the automerge schema

from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window

# Enable autoMerge for schema evolution

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "True")

query = dbutils.notebook.run(
    f"/bronze views/VW_D365_{table.upper()}",
    3600,
    {
        "append_only_mode": "yes",
        "incremental": "yes",
        "catalog": catalog,
        "schema": schema,
    },
)

df = (
    spark.readStream.format("delta")
    .option("readChangeFeed", "true")
    .table(f"{catalog}.bronze.d365_{table.lower()}_ao")
)

Step 2 Then I define the def function which will be passed when starting to write the data into the target table.

def update_changefeed(df, table, query, epochId):
    # Doing some transforamtion and picking up neccessary data
    filtered_df = df.filter(
        col("_change_type").isin("insert", "update_postimage", "delete")
    )
    filtered_df = filtered_df.drop(
        "_commit_timestamp", "_change_type", "_commit_version"
    )
    w = Window.partitionBy("id").orderBy(F.col("modifiedon").desc())
    filtered_df = (
        filtered_df.withWatermark("modifiedon", "1 second")
        .withColumn("rn", F.row_number().over(w))
        .where(F.col("rn") == 1)
        .drop("rn")
    )
    # Creating the global temp view on top of the dataframe in order to apply the select statement later
    filtered_df.createOrReplaceGlobalTempView(f"tmp_D365_{table}")
    # "query" refers to the output of the nootebook I run in the first step
    dfUpdates = sqlContext.sql(query)
    dfUpdates.columns
    # Below entire process is to collect the column names from source and target table to pass it in the merge function.
    # we will be merging on columns which contain BK in their names.
    p = re.compile("^BK_")
    list_of_columns = dfUpdates.columns
    list_of_BK_columns = [s for s in dfUpdates.columns if p.match(s)]
    string = ""
    for column in list_of_BK_columns:
        string += f"table.{column} = newData.{column} and "
    string_insert = ""
    for column in list_of_BK_columns:
        string_insert += f"table.{column} = newData.{column} and "
    string_insert[:-4]
    dictionary = {}
    for key in list_of_columns:
        dictionary[key] = f"newData.{key}"
    # Executing the merge function itself

    deltaTable = DeltaTable.forPath(
        spark,
        f"abfss://silver@{storage_account}.dfs.core.windows.net/D365/{table.lower()}_ao",
    )
    deltaTable.alias("table").merge(
        dfUpdates.alias("newData"), string
    ).whenMatchedUpdate(set=dictionary).whenNotMatchedInsert(
        values=dictionary
    ).execute()

Step 3.

writing part

df.writeStream.foreachBatch(
    lambda df, epochId: update_changefeed(df, table, query, epochId)
).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()

Error Im getting is

SET column `MSFT_DATASTATE` not found given columns: [`PK_D365_customeraddress`, `IsDelete` etc]

Indeed MSFT_DATASTATE column is not in my target delta table and is supposed to merge it there. Im not sure where it goes wrong.

Maybe worth to mention that before enabling spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "True")

I was getting the following error

cannot resolve MSFT_DATASTATE in UPDATE clause given columns ...

1

There are 1 answers

2
Aymen Azoui On

try this out :

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from delta.tables import DeltaTable
from pyspark.sql.types import StringType

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "True")
spark.conf.set("spark.databricks.delta.schema.autoOverwrite.enabled", "True")

query = dbutils.notebook.run(
    f"/bronze views/VW_D365_{table.upper()}",
    3600,
    {
        "append_only_mode": "yes",
        "incremental": "yes",
        "catalog": catalog,
        "schema": schema,
    },
)

df = spark.readStream.format("delta").option("readChangeFeed", "true").table(f"{catalog}.bronze.d365_{table.lower()}_ao")

def update_changefeed(df, table, query, epochId):
    filtered_df = df.filter(col("_change_type").isin("insert", "update_postimage", "delete"))
    filtered_df = filtered_df.drop("_commit_timestamp", "_change_type", "_commit_version")
    w = Window.partitionBy("id").orderBy(col("modifiedon").desc())
    filtered_df = filtered_df.withWatermark("modifiedon", "1 second").withColumn("rn", F.row_number().over(w)).where(col("rn") == 1).drop("rn")
    
    filtered_df.createOrReplaceGlobalTempView(f"tmp_D365_{table}")
    
    dfUpdates = sqlContext.sql(query)
    
    target_path = f"abfss://silver@{storage_account}.dfs.core.windows.net/D365/{table.lower()}_ao"
    deltaTable = DeltaTable.forPath(spark, target_path)
    for column in dfUpdates.columns:
        if column not in deltaTable.toDF().columns:
            deltaTable.addColumn(column, StringType(), True)

    p = re.compile("^BK_")
    list_of_BK_columns = [s for s in dfUpdates.columns if p.match(s)]
    merge_condition = " and ".join([f"table.{col} = newData.{col}" for col in list_of_BK_columns])
    
    dictionary = {key: f"newData.{key}" for key in dfUpdates.columns}
    
    deltaTable.alias("table").merge(dfUpdates.alias("newData"), merge_condition).whenMatchedUpdate(set=dictionary).whenNotMatchedInsert(values=dictionary).execute()

checkpoint_directory = "YOUR_CHECKPOINT_DIRECTORY"  # Update with your checkpoint directory
df.writeStream.foreachBatch(lambda df, epochId: update_changefeed(df, table, query, epochId)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()