How to recursively evaluate an Airflow macro {{ ds }} passed into DAG params

134 views Asked by At

We have an Airflow DAG to extract some data from a database table. It usually extracts data a day at a time and exports to another system. The DAG is scheduled @daily and extracts data for the given logical date {{ ds }} using a SQL WHERE clause like

WHERE date BETWEEN '{{ ds }}' AND '{{ ds }}'

This is working fine when the DAG is scheduled by Airflow.

When performing backfill operations, we would like to be able to manually trigger the DAG to extract a wider date range by supplying a DAG parameter and using it in the WHERE clause like:

WHERE date BETWEEN '{{ params.start_date }}' AND '{{ ds }}'

The Trigger DAG w/ config function in the Airflow UI can be used to accomplish this.

We need the SQL to work for daily "day-at-a-time" and backfill operations.

During a scheduled run, the start_date parameter defaults to:

{
   "start_date": "{{ ds }}",
   "start_date_alt": "ds"
}

and when we manually trigger a backfill, a static date would be supplied e.g.

{
    "start_date": "2023-01-01",
    "start_date_alt": "2023-01-01"
}

This contrived DAG example illustrates the issue with a simple BashOperator, but in our real world use case, it is a GlueJobOperator.

For a manually triggered run, the output from the BashOperator in the example is:

Extracting data from 2023-01-01 2023-01-01 to 2023-10-28 which is what we intend.

However for a scheduled DAG run, the output from the BashOperator is:

Extracting data from {{ ds }} ds to 2023-10-27 which is not what we indented, we need it to be e.g.

Extracting data from 2023-10-28 2023-10-28 to 2023-10-28

The issue seems to be that the value returned by {{ params.start_date }} is itself another jinja template which needs to be evaluated, but this is not happening.

I have tried removing the braces in params.start_date_alt to see if that fixes the issue, but it doesn't.

How can we get the value returned by {{ params.start_date }} to be evaluated as a jinja template when passed to the BashOperator?

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

DEFAULT_ARGS = {
    "start_date": datetime(2023, 9, 1),
}

with DAG(
    dag_id=f"recursive-template",
    default_args=DEFAULT_ARGS,
    schedule_interval="@daily",
    params={
        "start_date": "{{ ds }}",
        "start_date_alt": "ds"
    }
) as dag:
    bash_task = BashOperator(
        task_id="echo_dates",
        bash_command="echo Extracting data from {{ params.start_date }} {{ params.start_date_alt }} to {{ ds }}"
    )
1

There are 1 answers

0
Tim James On

I have a solution/work-around which is to ensure that templates are rendered twice by adding a pre_execute hook to the BashOperator. Thanks to ardan's answer to Make custom Airflow macros expand other macros Perhaps there's another way of ding this?

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
DEFAULT_ARGS = {
    "start_date": datetime(2023, 9, 1),
}

def pre_execute(context):
    # Render templates twice, once here, the other occurring within the execute() method of the operator.
    context['ti'].render_templates()

with DAG(
    dag_id=f"recursive-template",
    default_args=DEFAULT_ARGS,
    schedule_interval="@daily",
    params={
        "start_date": "{{ ds }}"
    },
) as dag:
    bash_task = BashOperator(
        task_id="echo_dates",
        bash_command="echo Extracting data from {{ params.start_date }} to {{ ds }}",
        pre_execute=pre_execute
    )