Skipping a task in airflow

2.2k views Asked by At

I would like to have an optional task in my dag, which runs according to a dag parameter. If is it skipped - I don't want the downstream tasks to be skipped too. I want it to in a skipped state with the right color so it will be clear it is skipped - that's why I don't want to use a condition inside of the task. I also don't want to change the next tasks (for example I don't want to change the trigger rule condition).

I tried to wrap my task with a task group and add another operator, but I still have a problem - the tasks after my task group are skipped. I will be happy if you can help me to fix the issue or provide with another option to skip a task.

with DAG(dag_id="my_dag", params={"should_run": Param(False)}):
    @task_group
    def my_group():
        @task.short_circuit()
        def should_run(**kwargs):
            return kwargs["params"]["should_run"]

        @task
        def do_run():
            print("This is the optional task!")

        should_run() >> do_run()


    @task()
    def next_task():
        print("Next task, other tasks can follow this one")

    my_group() >> next_task()
2

There are 2 answers

12
Doof On

This is so easy to implement , follow any three ways:

  1. Introduce a branch operator, in the function present the condition
  2. Use the trigger rule for the task, to skip the task based on previous parameter

This should help !

Adding an example as requested by author, here is the code

from airflow.decorators import task, task_group
from airflow import DAG
from datetime import datetime
from airflow.operators.python import BranchPythonOperator
from airflow.models import Param

with DAG(
    dag_id="my_dag",
    start_date=datetime(2022, 10, 1),
    params={"should_run": False}
):

    @task(task_id="option_1")
    def option_1():
        print("This is the optional task 1 the value of it depends on the param should_run !")

    def choose_branch(params):
        print(params)
        print(params["should_run"])

        if params["should_run"]:
            return ["option_1"]
        return ["option_2"]

    branch = BranchPythonOperator(
        task_id='branch_choose',
        python_callable=choose_branch
    )

    @task(task_id="first_task")
    def first_task():
        print("First task, other tasks can follow this one")

    @task(task_id="option_2")
    def option_2():
        print("This is the optional task 1 the value of it depends on the param should_run !")

    @task(task_id="final_task", trigger_rule='one_success')
    def final_task():
        print("Final task, this marks the end of the pipeline")

    first_task() >> branch >> [option_1(), option_2()] >> final_task()

this is when the input param is set to False

[![enter image description here][1]][1]

this is when the input param is set to True i.e. { "should_run": "True"}

[![enter image description here][2]][2]

This is another way using short circuit, this doesnt skip the downstream,

with DAG(
    dag_id="my_dag",
    start_date=datetime(2022, 10, 1),
    params={"should_run": False}
):

    @task(task_id="option_1")
    def option_1():
        print("This is the optional task 1 the value of it depends on the param should_run !")

    def run_mybranch(params):
        """This  method checks the environment if the env is Dev, then
        it will return False which will skip the task to send alerts in dev env
        Returns:
            Bool: True or False
        """
        return True if params["should_run"] else False

    branch = ShortCircuitOperator(
        task_id='branch',
        python_callable=run_mybranch,
        ignore_downstream_trigger_rules=False,
    )

    @task(task_id="first_task")
    def first_task():
        print("First task, other tasks can follow this one")

    @task(task_id="option_2")
    def option_2():
        print("This is the optional task 1 the value of it depends on the param should_run !")

    @task(task_id="final_task", trigger_rule='none_failed')
    def final_task():
        print("Final task, this marks the end of the pipeline")

    op1 = option_1()
    op2 = option_2()
    ft = final_task()

    first_task() >> branch >> [op1, op2]
    op1 >> ft
    op2 >> ft


![enter image description here


[![enter image description here][3]][3]


  [1]: https://i.stack.imgur.com/uCIHT.png
  [2]: https://i.stack.imgur.com/yVl7f.png
  [3]: https://i.stack.imgur.com/NE3fV.png
1
caxefaizan On

What you can do here is, in the optional task you can create an X_com which says whether to skip it or not. Lets say ur condition passes True and the optional task is marked green at the moment.

Now in the next_task you can use the dag context to fech the task instance of the optional task and then set state as skipped if the xcom value was true.

This will change the color and your immediate tasks will not be skipped.

def mark_skipped(context):
    dag_run_object = context["dag_run"]
    list_of_tasks = dag_run_object.get_task_instances()

    #.... Check name in list and then finally
    Your_TaskIntance.set_state('skipped')