Airflow - Splitting DAG definition across multiple files

10.5k views Asked by At

Just getting started with Airflow and wondering what best practices are for structuring large DAGs. For our ETL, we have a lots of tasks that fall into logical groupings, yet the groups are dependent on each other. Which of the following would be considered best practice?

  • One large DAG file with all tasks in that file
  • Splitting the DAG definition across multiple files (How to do this?)
  • Define multiple DAGs, one for each group of tasks, and set dependencies between them using ExternalTaskSensor

Also open to other suggestions.

3

There are 3 answers

0
Vineet Goel On BEST ANSWER

DAGs are just python files. So you could split a single dag definition into multiple files. The different files should just have methods that take in a dag object and create tasks using that dag object.

Note though, you should just a single dag object in the global scope. Airflow picks up all dag objects in the global scope as separate dags.

It is often considered good practice to keep each dag as concise as possible. However if you need to set up such dependencies you could either consider using subdags. More about this here: https://airflow.incubator.apache.org/concepts.html?highlight=subdag#scope

You could also use ExternalTaskSensor but beware that as the number of dags grow, it might get harder to handle external dependencies between tasks. I think subdags might be the way to go for your use case.

0
Sergey Shcherbakov On

It seems that it is possible to place your Python modules into the plugins/ subfolder and import them from the DAG file:

https://airflow.apache.org/docs/apache-airflow/stable/plugins.html

0
L. D. Nicolas May On

With the advent of TaskGroups in Airflow 2.x, it's worth expanding on a previous answer. TaskGroups are just UI groupings for tasks, but they also serve as handy logical groupings for a bunch of related tasks. The tasks in a TaskGroup can be bundled and abstracted away to make it easier to build a DAG out of larger pieces. That being said, it may still be useful to have a file full of related tasks without bundling them into a TaskGroup.

The trick to breaking up DAGs is to have the DAG in one file, for example my_dag.py, and the logical chunks of tasks or TaskGroups in separate files, with one logical task chunk or TaskGroup per file. Each file contains functions (or methods if you want to take an OO approach) each of which returns an operator instance or a TaskGroup instance.

To illustrate, my_dag.py (below) imports operator-returning functions from foo_bar_tasks.py, and it imports a TaskGroup-returning function from xyzzy_taskgroup.py. Within the DAG context, those functions are called and their return values are assigned to task or TaskGroup variables, which can be assigned up-/downstream dependencies.

dags/my_dag.py:

# std lib imports
 
from airflow import DAG
# other airflow imports
 
from includes.foo_bar_tasks import build_foo_task, build_bar_task
from includes.xyzzy_taskgroup import build_xyzzy_taskgroup
 
with DAG(dag_id="my_dag", ...) as dag:
 
    # logical chunk of tasks
    foo_task = build_foo_task(dag=dag, ...)
    bar_task = build_bar_task(dag=dag, ...)
 
    # taskgroup
    xyzzy_taskgroup = build_xyzzy_taskgroup(dag=dag, ...)
 
    foo_task >> bar_task >> xyzzy_taskgroup

plugins/includes/foo_bar_tasks.py:

# std lib imports
 
from airflow import DAG
from airflow.operators.foo import FooOperator
from airflow.operators.bar import BarOperator
# other airflow imports
 
def build_foo_task(dag: DAG, ...) -> FooOperator:
    # ... logic here ...
    foo_task = FooOperator(..., dag=dag)
 
    return foo_task
 
def build_bar_task(dag: DAG, ...) -> BarOperator:
    # ... logic here ...
    bar_task = BarOperator(..., dag=dag)
 
    return bar_task

plugins/includes/xyzzy_taskgroup.py:

# std lib imports
 
from airflow import DAG
from airflow.operators.baz import BazOperator
from airflow.operators.qux import QuxOperator
from airflow.utils import TaskGroup
# other airflow imports
 
def build_xyzzy_taskgroup(dag: DAG, ...) -> TaskGroup:
    xyzzy_taskgroup = TaskGroup(group_id="xyzzy_taskgroup")
 
    # ... logic here ...
    baz_task = BazOperator(task_id="baz_task", task_group=xyzzy_taskgroup, ...)
 
    # ... logic here ...
    qux_task = QuxOperator(task_id="qux_task", task_group=xyzzy_taskgroup, ...)
 
    baz_task >> qux_task
 
    return xyzzy_taskgroup