Existing column unrecognized by Delta merge

41 views Asked by At

Sample of my data I'm working on:

Source

+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type                |store_status |        name        |    owner           |owner_code    |store_asOfDate    |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|  123    |type                |not_active   |name                |xyz                 |    xyz       |        2024-03-20|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+

Target

+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type                |store_status |        name        |    owner           |owner_code    |store_asOfDate    |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|  123    |type                |active       |name                |xyz                 |    xyz       |        2024-03-15|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+

Code snippet

target_dt.alias("target") \
    .merge(
        source=df_trusted.alias("source"),
        condition="target.store_id=source.store_id AND target.store_status=source.store_status"
    ) \
    .whenNotMatchedBySourceUpdate(
        set={
            "store_status": F.col("source.store_status"),
            "store_asOfDate": F.col("source.store_asOfDate")
        }
    ) \
    .execute()

Expected behaviour:

  • target's row store_status and store_asOfDate are updated.

Target (after merge/upsert)

+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type                |store_status |        name        |    owner           |owner_code    |store_asOfDate    |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|  123    |type                |not_active   |name                |xyz                 |    xyz       |        2024-03-20|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+

Currently, error is thrown:

24/03/21 14:06:29 ERROR Error occured in my_method() method: [DELTA_MERGE_UNRESOLVED_EXPRESSION] Cannot resolve source.store_id in UPDATE condition given columns....

Please suggest where I can debug further for root cause. Thanks in advance!

1

There are 1 answers

2
Samuel Demir On

I think your goal is to have a DeltaTable at the end of:

+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type                |store_status |        name        |    owner           |owner_code    |store_asOfDate    |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|  123    |type                |not_active   |name                |xyz                 |    xyz       |        2024-03-15|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+

Let me write your code a bit more advanced as you are then able to set the columns by a list in your merge statement. The event alias is the table that is already existing as it contains event records. The updates alias is the table that should contain the rows to update.

from pyspark.sql import functions as f

merge_keys = ["store_id"]
cols_to_update = ["store_status", "store_asOfDate"]
(
    target_dt.alias("events")
    .merge(
        source=df_trusted.alias("updates"),
        condition=" AND ".join([f"events.`{x}` = updates.`{x}`" for x in merge_keys])
    )
    .whenMatchedUpdate(set={x: f"updates.`{x}`" for x in cols_to_update})
    .execute()
)

Also please do not write python code with line separators of type \ as they are deprecated. Instead use parentheses or pre-commit hooks with black formatter.