Airflow 2.7 documentation for Dynamic Task Mapping states that "mapped tasks are created based on the output of a previous task".
Nothing is said in docs and examples about how to create mapped task based on output of any other upstream task, not only previous one.
For example, trying to create a DAG like this: A >> B >> C[], where C[] should be mapped based on results of A (not B) creates additional A >> C[] dependency, which I don't need. See Graph view
Code example:
from airflow.decorators import dag, task, task_group
from datetime import datetime
import random as rnd
@task
def A():
return [x for x in range(int(rnd.random()*10)+1)]
@task
def B():
print("intermediate")
@task
def C(x):
return x + 10
@dag(dag_id="TEST_DYNAMIC_SIMPLE",start_date=datetime(2023, 10, 11), catchup=False,schedule=None,tags=["TESTS"])
def TEST_DYNAMIC_SIMPLE():
tA = A()
exC = C.expand(x=tA)
tA >> B() >> exC
globals()["TEST_DYNAMIC_SIMPLE"] = TEST_DYNAMIC_SIMPLE()
# Local test
if __name__ == "__main__":
dag = globals()['TEST_DYNAMIC_SIMPLE']
dag.test()