airflow dag dependency configuration to run one path

57 views Asked by At

In airflow dag containing emr related, defined tasks dependencies like below. Now I just need one path to run (create_cluster_a->setup_active->trigger_a->trigger_b->trigger_c->trigger_d) from mwaa schedule. Other dependencies are required incase of failure, if needed to run from specific step

test_create_cluster_a >> setup_active_emr_cluster_a_id >> test_emr_trigger_job_a >> test_emr_trigger_job_b >> test_emr_trigger_job_c >> test_emr_trigger_job_d
test_create_cluster_b >> setup_active_emr_cluster_b_id >> test_emr_trigger_job_b >> test_emr_trigger_job_c >> test_emr_trigger_job_d
test_create_cluster_c >> setup_active_emr_cluster_c_id >> test_emr_trigger_job_c >> test_emr_trigger_job_d
test_create_cluster_d >> setup_active_emr_cluster_d_id >> test_emr_trigger_job_d

But currently, it runs all

1

There are 1 answers

0
subram On

Have you tried using BranchPythonOperator. With this operator you can define a python callable that does some kind of user condition check to trigger the next task. https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/python/index.html#airflow.operators.python.BranchPythonOperator Sample python callable code ..

def my_branch_func(ti) -> None:
    if condition_A :
        return 'task_trigger_a' 
    elif condition_B:
        return 'task_trigger_b'
    else:
        return 'task_trigger_c'

Task definition:

task_branch_by_condition = BranchPythonOperator(
    task_id='task_branch_by_condition',
    provide_context=True,
    python_callable=my_branch_func
)

And in the bottom of the DAG specify the order like this ..

some_other_tasks >>   task_branch_by_condition >> [task_trigger_a, 
task_trigger_b,task_trigger_c]
task_trigger_a >> do_something_a 
task_trigger_b >> do_something_b 
task_trigger_c >> do_something_c