I have an airflow DAG, with dependencies: start_task >> group1, start_task >> group2.
Tasks lie in the "tasks" dictionary and are used to fill out groups of tasks in the DAG
from airflow import DAG
from datetime import datetime
from airflow.operators.empty import EmptyOperator
from python.etl.dag_param_generator import DAGParamGenerator
from airflow.utils.task_group import TaskGroup
dag_param_generator = DAGParamGenerator('TEST_DINAMIC_DAG')
tasks = {
"task_1": EmptyOperator(task_id="task_1"),
"task_2": EmptyOperator(task_id="task_2"),
"task_3": EmptyOperator(task_id="task_3"),
"task_4": EmptyOperator(task_id="task_4")
}
with DAG(
dag_id=dag_param_generator.get_dag_params('dag_id'),
schedule=dag_param_generator.get_dag_params('schedule'),
start_date=dag_param_generator.get_dag_params('start_date'),
catchup=dag_param_generator.get_dag_params('catchup')
) as dag:
start_task = EmptyOperator(task_id="start_task")
with TaskGroup("group1") as group1:
for key, value in tasks.items():
if key in ['task_1', 'task_2']:
print(value)
globals()[key] = value
with TaskGroup("group2") as group2:
for key, value in tasks.items():
if key in ['task_3', 'task_4']:
print(value)
globals()[key] = value
start_task >> group1
start_task >> group2
Tasks don't fit in groups, and I get the following result(groups are empty):

How can I resolve this problem?
I have no idea what you mean by
globals()there and why are you double looping. Instead, add task to the group.As to
globals(): The act of creating a variable namedtask_1in global does not create a task. Running the constructor of the task, i.e.EmptyOperator()creates a task. You could have an array of lambdas if you want late execution. As in: