Airflow - how to define task dependencies between DummyOperators and task group - using decorators syntax

207 views Asked by At

I am going through Airflow documentation and documentation from https://docs.astronomer.io/learn/managing-dependencies?tab=taskflow#dependencies-in-dynamic-task-mapping to try new features coming with Airflow 2. Currently, I am testing task dependencies and I am bit confused about how to use TaskGroups.

I am trying to create a simple DAG in which I want to include TaskGroup task dependencies in combination with tasks outside from a group, as shown in example below:

from datetime import datetime

from airflow.decorators import dag, task_group
from airflow.models.baseoperator import chain
from airflow.models.dag import DagContext
from airflow.operators.dummy import DummyOperator
from airflow.providers.ssh.operators.ssh import SSHOperator


@dag(
    start_date=datetime(2021, 4, 20, 15, 0),
    concurrency=5,
    max_active_runs=1,
    catchup=False,
    schedule_interval="0 17 * * *"
)
def sv_load():
    start = DummyOperator(task_id="start")
    end = DummyOperator(task_id="end")

    @task_group(group_id="tg_test")
    def tg1():
        t1 = DummyOperator(task_id="task1")
        t2 = DummyOperator(task_id="task2")

        t1 >> t2


    start >> tg1() >> end

dag = sv_load()

But this code gives me an error:

AttributeError: 'NoneType' object has no attribute 'update_relative'

What is wrong with this code? How to combine task dependencies between tasks from Task Groups with tasks outside of a group? I explicitly want to use task decorator (TaskFlow API) syntax.

1

There are 1 answers

0
Danila Ganchar On

Didn't use @task_group decorator. You can use TaskGroup class. Here is an example:

from datetime import datetime

from airflow.decorators import dag
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup


@dag(
    start_date=datetime(2021, 4, 20, 15, 0),
    concurrency=5,
    max_active_runs=1,
    catchup=False,
    schedule_interval='0 17 * * *'
)
def sv_load():
    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')

    with TaskGroup('tg_test', tooltip='Tasks for inner_section2') as tg_test:
        t1 = DummyOperator(task_id='task1')
        t2 = DummyOperator(task_id='task2')

        t1 >> t2

    start >> tg_test >> end


dag = sv_load()