We have an Airflow DAG to extract some data from a database table. It usually extracts data a day at a time and exports to another system. The DAG is scheduled @daily
and extracts data for the given logical date {{ ds }}
using a SQL WHERE clause like
WHERE date BETWEEN '{{ ds }}' AND '{{ ds }}'
This is working fine when the DAG is scheduled by Airflow.
When performing backfill operations, we would like to be able to manually trigger the DAG to extract a wider date range by supplying a DAG parameter and using it in the WHERE clause like:
WHERE date BETWEEN '{{ params.start_date }}' AND '{{ ds }}'
The Trigger DAG w/ config
function in the Airflow UI can be used to accomplish this.
We need the SQL to work for daily "day-at-a-time" and backfill operations.
During a scheduled run, the start_date
parameter defaults to:
{
"start_date": "{{ ds }}",
"start_date_alt": "ds"
}
and when we manually trigger a backfill, a static date would be supplied e.g.
{
"start_date": "2023-01-01",
"start_date_alt": "2023-01-01"
}
This contrived DAG example illustrates the issue with a simple BashOperator
, but in our real world use case, it is a GlueJobOperator
.
For a manually triggered run, the output from the BashOperator
in the example is:
Extracting data from 2023-01-01 2023-01-01 to 2023-10-28
which is what we intend.
However for a scheduled DAG run, the output from the BashOperator
is:
Extracting data from {{ ds }} ds to 2023-10-27
which is not what we indented, we need it to be e.g.
Extracting data from 2023-10-28 2023-10-28 to 2023-10-28
The issue seems to be that the value returned by {{ params.start_date }}
is itself another jinja template which needs to be evaluated, but this is not happening.
I have tried removing the braces in params.start_date_alt
to see if that fixes the issue, but it doesn't.
How can we get the value returned by {{ params.start_date }}
to be evaluated as a jinja template when passed to the BashOperator
?
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
DEFAULT_ARGS = {
"start_date": datetime(2023, 9, 1),
}
with DAG(
dag_id=f"recursive-template",
default_args=DEFAULT_ARGS,
schedule_interval="@daily",
params={
"start_date": "{{ ds }}",
"start_date_alt": "ds"
}
) as dag:
bash_task = BashOperator(
task_id="echo_dates",
bash_command="echo Extracting data from {{ params.start_date }} {{ params.start_date_alt }} to {{ ds }}"
)
I have a solution/work-around which is to ensure that templates are rendered twice by adding a
pre_execute
hook to theBashOperator
. Thanks to ardan's answer to Make custom Airflow macros expand other macros Perhaps there's another way of ding this?