Airflow: Dynamic Task Mapping: Expand on results of any upstream task, not only previous one

438 views Asked by At

Airflow 2.7 documentation for Dynamic Task Mapping states that "mapped tasks are created based on the output of a previous task".

Nothing is said in docs and examples about how to create mapped task based on output of any other upstream task, not only previous one.

For example, trying to create a DAG like this: A >> B >> C[], where C[] should be mapped based on results of A (not B) creates additional A >> C[] dependency, which I don't need. See Graph view

Code example:

from airflow.decorators import dag, task, task_group
from datetime import datetime
import random as rnd

@task
def A():
    return [x for x in range(int(rnd.random()*10)+1)]

@task
def B():
    print("intermediate")

@task
def C(x):
    return x + 10

@dag(dag_id="TEST_DYNAMIC_SIMPLE",start_date=datetime(2023, 10, 11), catchup=False,schedule=None,tags=["TESTS"])
def TEST_DYNAMIC_SIMPLE():
    tA = A()
    exC = C.expand(x=tA)
    tA >> B() >> exC

globals()["TEST_DYNAMIC_SIMPLE"] = TEST_DYNAMIC_SIMPLE() 

# Local test
if __name__ == "__main__":
    dag = globals()['TEST_DYNAMIC_SIMPLE']        
    dag.test()
0

There are 0 answers