Task grouping in Airflow

40 views Asked by At

I have an airflow DAG, with dependencies: start_task >> group1, start_task >> group2.

Tasks lie in the "tasks" dictionary and are used to fill out groups of tasks in the DAG

from airflow import DAG
from datetime import datetime
from airflow.operators.empty import EmptyOperator
from python.etl.dag_param_generator import DAGParamGenerator
from airflow.utils.task_group import TaskGroup

dag_param_generator = DAGParamGenerator('TEST_DINAMIC_DAG')
tasks = {
    "task_1": EmptyOperator(task_id="task_1"),
    "task_2": EmptyOperator(task_id="task_2"),
    "task_3": EmptyOperator(task_id="task_3"),
    "task_4": EmptyOperator(task_id="task_4")
}

with DAG(
    dag_id=dag_param_generator.get_dag_params('dag_id'),
    schedule=dag_param_generator.get_dag_params('schedule'),
    start_date=dag_param_generator.get_dag_params('start_date'),
    catchup=dag_param_generator.get_dag_params('catchup')
) as dag:
    start_task = EmptyOperator(task_id="start_task")

    with TaskGroup("group1") as group1:
        for key, value in tasks.items():
            if key in ['task_1', 'task_2']:
                print(value)
                globals()[key] = value
    with TaskGroup("group2") as group2:
        for key, value in tasks.items():
            if key in ['task_3', 'task_4']:
                print(value)
                globals()[key] = value
                
    start_task >> group1
    start_task >> group2

Tasks don't fit in groups, and I get the following result(groups are empty): enter image description here

How can I resolve this problem?

1

There are 1 answers

2
KamilCuk On BEST ANSWER

I have no idea what you mean by globals() there and why are you double looping. Instead, add task to the group.

with TaskGroup("group1") as group1:
    for key in ['task_1', 'task_2']:
        group1.add(tasks[key])
with TaskGroup("group2") as group2:
    for key in ['task_3', 'task_4']:
        group2.add(tasks[key])

As to globals(): The act of creating a variable named task_1 in global does not create a task. Running the constructor of the task, i.e. EmptyOperator() creates a task. You could have an array of lambdas if you want late execution. As in:

tasks = {
    "task_1": lambda:  EmptyOperator(task_id="task_1"),
    "task_2": lambda: EmptyOperator(task_id="task_2"),
    "task_3": lambda: EmptyOperator(task_id="task_3"),
    "task_4": lambda: EmptyOperator(task_id="task_4")
}

   with TaskGroup("group1") as group1:
        for key in ['task_1', 'task_2']:
            tasks[key]()
    with TaskGroup("group2") as group2:
        for key in ['task_3', 'task_4']:
            tasks[key]()