Create a daily DAG that will run for multiple days

40 views Asked by At

I want to create a daily DAG that reads a file based on the date the DAG execution started. The file has list of folders to be processed for that day. The number of folders can change every day. If there a lot of folders the DAG can take multiple days to complete.

For each folder in the file for the day that the DAG execution started we have to perform few different DAG tasks (PythonOperator, BashOperator, etc). Even if the current date changes and the DAG takes more that 24 hours it should continue processing the list of folders obtained above.

The issue is if you have global python variables in the DAG then they get updated everytime the DAG is parsed. So If I use a variable like this:

    DATE = datetime.now(tz).date()

The the value of the DATE variable changes when the date changes.

So I tried saving the date variable in a file with the name of the DAG's run_id so that the file name will be unique always and the run_id of the DAG is unique for each DAG run and always constant while the DAG is running.

However, the issue is getting the value and passing it across different task.s For this I was able to create a user_defined_macro and pass the date read from the file to all the tasks.

After this the next issue is how do we make the list of tasks to be performed for each date work. I tried using for loop, but obviously it doesn't work since the python code is parsed on DAG parsing and not when the DAG is executing. This is the last issue that I cannot figure out how to solve. Basically loop through every single folder for the particular date.

1

There are 1 answers

0
Andrey Anshin On

I'm pretty sure that you've reinvent default behaviour of Apache Airflow and Data Intervals. I guess everything related to deterministic dates could be achieved by access to logical_date, data_interval_start or data_interval_end. You could choose one the approaches

from __future__ import annotations

from datetime import datetime, timezone
from typing import TYPE_CHECKING

from airflow import DAG
from airflow.decorators import task
from airflow.models import BaseOperator
from airflow.operators.python import PythonOperator, get_current_context

if TYPE_CHECKING:
    from airflow.models.dagrun import DagRun
    from airflow.utils.context import Context


class CustomOperator(BaseOperator):
    def execute(self, context: Context):
        dag_run: DagRun = context["dag_run"]
        print(f"Logical Date: {dag_run.logical_date}")
        print(f"Data Interval Start: {dag_run.data_interval_start}")
        print(f"Data Interval End: {dag_run.data_interval_end}")


def via_callable():
    context = get_current_context()
    dag_run: DagRun = context["dag_run"]
    print(f"Logical Date: {dag_run.logical_date}")
    print(f"Data Interval Start: {dag_run.data_interval_start}")
    print(f"Data Interval End: {dag_run.data_interval_end}")


with DAG(
    "so_dag",
    schedule="0 10 * * *",
    start_date=datetime(2024, 3, 1, tzinfo=timezone.utc),
    tags=["task-context"],
    catchup=False,
) as dag:

    @task
    def task_flow_example_via_kwargs(**kwargs):
        dag_run: DagRun = kwargs["dag_run"]
        print(f"Logical Date: {dag_run.logical_date}")
        print(f"Data Interval Start: {dag_run.data_interval_start}")
        print(f"Data Interval End: {dag_run.data_interval_end}")

    @task
    def task_flow_example_via_runtime_magic(*, dag_run: DagRun):
        print(f"Logical Date: {dag_run.logical_date}")
        print(f"Data Interval Start: {dag_run.data_interval_start}")
        print(f"Data Interval End: {dag_run.data_interval_end}")

    task_flow_example_via_kwargs()
    task_flow_example_via_runtime_magic()
    CustomOperator(task_id="via_custom_operator")
    PythonOperator(task_id="via_callable", python_callable=via_callable)

Please note that options are not limited by listed above, there is also could be accessed into Templates