I am going through Airflow documentation and documentation from https://docs.astronomer.io/learn/managing-dependencies?tab=taskflow#dependencies-in-dynamic-task-mapping to try new features coming with Airflow 2. Currently, I am testing task dependencies and I am bit confused about how to use TaskGroups.
I am trying to create a simple DAG in which I want to include TaskGroup task dependencies in combination with tasks outside from a group, as shown in example below:
from datetime import datetime
from airflow.decorators import dag, task_group
from airflow.models.baseoperator import chain
from airflow.models.dag import DagContext
from airflow.operators.dummy import DummyOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
@dag(
start_date=datetime(2021, 4, 20, 15, 0),
concurrency=5,
max_active_runs=1,
catchup=False,
schedule_interval="0 17 * * *"
)
def sv_load():
start = DummyOperator(task_id="start")
end = DummyOperator(task_id="end")
@task_group(group_id="tg_test")
def tg1():
t1 = DummyOperator(task_id="task1")
t2 = DummyOperator(task_id="task2")
t1 >> t2
start >> tg1() >> end
dag = sv_load()
But this code gives me an error:
AttributeError: 'NoneType' object has no attribute 'update_relative'
What is wrong with this code? How to combine task dependencies between tasks from Task Groups with tasks outside of a group? I explicitly want to use task decorator (TaskFlow API) syntax.
Didn't use
@task_group
decorator. You can useTaskGroup
class. Here is an example: