Dagster : specify multiple TimeWindowPartitionMapping for a same asset dependency

42 views Asked by At

I have a SourceAsset that represents a dataframe partitioned every 15 minutes, I have another asset that I call lagged_df in which I want to have the previous day and current partition at the same time . (In two parameters therefore)

original_dfs = SourceAsset(
    key="merged_data",
    io_manager_key="partitioned_parquet_io_manager",
    description="Descriptions",
    partitions_def=fifteen_minutes_partitions,
    metadata={"io_manager": TimePartitionConfig(time_partition_column="timestamp")},
)


@asset(
    io_manager_key="partitioned_parquet_io_manager",
    partitions_def=fifteen_minutes_partitions,
    description="description",
    metadata={"io_manager": TimePartitionConfig(time_partition_column="timestamp")},
    ins={
        "previous_day_original_dfs ": AssetIn(
            "original_dfs",
            partition_mapping=TimeWindowPartitionMapping(start_offset=-4*24*7, end_offset=-4*24*7),
        ),
        "current_original_dfs": AssetIn(
            "original_dfs",
            partition_mapping=TimeWindowPartitionMapping(start_offset=0, end_offset=0),
        ),
    },
)
def lagged_df(
    context: OpExecutionContext,
    previous_original_dfs: pd.DataFrame,
    current_original_dfs: pd.DataFrame,
) -> pd.DataFrame:
    foo = 2
    return pd.DataFrame()

However after launching the code in the debugger I always receive a KeyError previous_day_original_dfs

I have tried to remove the current_original_dfs and it works, I do not have anymore an error for previous_original_dfs. Problem is therefore that it seems I can only put one "ins" with the same asset key. Which forces me to read a whole lot of data that I don't need (all data from now to the previous data)

Is there a way to specify two different TimeWindowPartitionMapping for a same asset?

If not do you have any workaround?

0

There are 0 answers