I need a rollback operation to happen when a certain airflow task fails. To know what to rollback I need access to the task arguments inside the rollback function. The rollback function is passed to the on_failure_callback argument when defining the task.
Take this as a simplified example:
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
def rollback(context: dict):
print("How do I access the 'task_argument' value?")
@task(on_failure_callback=rollback)
def example_task(task_argument: str) -> None:
assert False
@dag(
schedule_interval=None,
start_date=days_ago(1),
)
def example_dag() -> None:
example_task("the task argument's value.")
example_dag()
How do I get the value that was passed to the example_task inside the on_failure_callback? I'm sure it's hiding in the context variable but I have not been able to find clear documentation on what is inside context. context does contain a field params but that does not contain task_argument.
This code snippet worked for me.. Basically if you are using @task decorator you need to specify the context variable in the function arguments. per this doc page https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/taskflow.html#taskflow