How to use default and runtime name for same task?

44 views Asked by At

I've below dag

from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.models import Variable
import pendulum
from datetime import datetime

local_tz = pendulum.timezone('US/Pacific')

DAG_ID = "demo"
with DAG(
        dag_id=DAG_ID,
        default_args={
            "owner": "GFT Reporting Services",
            "depends_on_past": False,
            "email": ["[email protected]"],
            "email_on_failure": False,
            "email_on_retry": False
        },
        default_view="graph",
        schedule_interval=None,
        start_date=datetime(2022, 1, 4, tzinfo=local_tz),
        tags=['demo'],
        catchup=False

) as dag:

    def get_runtime_params(**kwargs):

        dag_run: DagRun = kwargs['dag_run']
        message = dag_run.conf['message']
        period = message['period']

        Variable.set('trigger_period', value=period, serialize_json=True)
        return "Its Demo"

    def get_doing(**kwargs):

        print("doing Airflow")

    start = DummyOperator(task_id='begin_execution')
    end = DummyOperator(task_id='end_execution')

    with TaskGroup(group_id='demo_compute') as demo_compute:
    
        task1 = PythonOperator(dag=dag,
                                                 task_id='get_params_to_update_config',
                                                 python_callable=get_runtime_params
                                                 )

        taskName = Variable.get("trigger_period", deserialize_json=True)
        with TaskGroup(group_id=taskName) as inside_demo_compute:

            get_doingTask = PythonOperator(dag=dag,
                                            task_id='get_doing',
                                         python_callable=get_doing
                                         )

        task1 >> get_doingTask

    start >> demo_compute >> end

As you can see, taskName default value is assigned from Variable trigger_period, and when dag is triggered and running, it takes value from trigger_period variable only, but its updated by the value from conf

Any option to not to use taskName default value from Variable = trigger_period and manage it in Dag itself ?

Any suggestion will be helpful

0

There are 0 answers