Sample of my data I'm working on:
Source
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type |store_status | name | owner |owner_code |store_asOfDate |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
| 123 |type |not_active |name |xyz | xyz | 2024-03-20|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
Target
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type |store_status | name | owner |owner_code |store_asOfDate |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
| 123 |type |active |name |xyz | xyz | 2024-03-15|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
Code snippet
target_dt.alias("target") \
.merge(
source=df_trusted.alias("source"),
condition="target.store_id=source.store_id AND target.store_status=source.store_status"
) \
.whenNotMatchedBySourceUpdate(
set={
"store_status": F.col("source.store_status"),
"store_asOfDate": F.col("source.store_asOfDate")
}
) \
.execute()
Expected behaviour:
- target's row
store_statusandstore_asOfDateare updated.
Target (after merge/upsert)
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
|store_id |type |store_status | name | owner |owner_code |store_asOfDate |
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
| 123 |type |not_active |name |xyz | xyz | 2024-03-20|
+---------+--------------------+-------------+--------------------+--------------------+--------------+------------------+
Currently, error is thrown:
24/03/21 14:06:29 ERROR Error occured in my_method() method: [DELTA_MERGE_UNRESOLVED_EXPRESSION] Cannot resolve source.store_id in UPDATE condition given columns....
Please suggest where I can debug further for root cause. Thanks in advance!
I think your goal is to have a DeltaTable at the end of:
Let me write your code a bit more advanced as you are then able to set the columns by a list in your merge statement. The
eventalias is the table that is already existing as it contains event records. Theupdatesalias is the table that should contain the rows to update.Also please do not write python code with line separators of type
\as they are deprecated. Instead use parentheses or pre-commit hooks with black formatter.