I have a SourceAsset
that represents a dataframe partitioned every 15 minutes, I have another asset that I call lagged_df
in which I want to have the previous day and current partition at the same time . (In two parameters therefore)
original_dfs = SourceAsset(
key="merged_data",
io_manager_key="partitioned_parquet_io_manager",
description="Descriptions",
partitions_def=fifteen_minutes_partitions,
metadata={"io_manager": TimePartitionConfig(time_partition_column="timestamp")},
)
@asset(
io_manager_key="partitioned_parquet_io_manager",
partitions_def=fifteen_minutes_partitions,
description="description",
metadata={"io_manager": TimePartitionConfig(time_partition_column="timestamp")},
ins={
"previous_day_original_dfs ": AssetIn(
"original_dfs",
partition_mapping=TimeWindowPartitionMapping(start_offset=-4*24*7, end_offset=-4*24*7),
),
"current_original_dfs": AssetIn(
"original_dfs",
partition_mapping=TimeWindowPartitionMapping(start_offset=0, end_offset=0),
),
},
)
def lagged_df(
context: OpExecutionContext,
previous_original_dfs: pd.DataFrame,
current_original_dfs: pd.DataFrame,
) -> pd.DataFrame:
foo = 2
return pd.DataFrame()
However after launching the code in the debugger I always receive a KeyError previous_day_original_dfs
I have tried to remove the current_original_dfs
and it works, I do not have anymore an error for previous_original_dfs
. Problem is therefore that it seems I can only put one "ins
" with the same asset key. Which forces me to read a whole lot of data that I don't need (all data from now to the previous data)
Is there a way to specify two different TimeWindowPartitionMapping
for a same asset?
If not do you have any workaround?