I would like to have an optional task in my dag, which runs according to a dag parameter. If is it skipped - I don't want the downstream tasks to be skipped too. I want it to in a skipped state with the right color so it will be clear it is skipped - that's why I don't want to use a condition inside of the task. I also don't want to change the next tasks (for example I don't want to change the trigger rule condition).
I tried to wrap my task with a task group and add another operator, but I still have a problem - the tasks after my task group are skipped. I will be happy if you can help me to fix the issue or provide with another option to skip a task.
with DAG(dag_id="my_dag", params={"should_run": Param(False)}):
@task_group
def my_group():
@task.short_circuit()
def should_run(**kwargs):
return kwargs["params"]["should_run"]
@task
def do_run():
print("This is the optional task!")
should_run() >> do_run()
@task()
def next_task():
print("Next task, other tasks can follow this one")
my_group() >> next_task()
This is so easy to implement , follow any three ways:
This should help !
Adding an example as requested by author, here is the code
this is when the input param is set to False
[![enter image description here][1]][1]
this is when the input param is set to True i.e. { "should_run": "True"}
[![enter image description here][2]][2]
This is another way using short circuit, this doesnt skip the downstream,