Possible I misunderstand something, I try build DAG in new approach TaskFLow API: Here example from docs:
import json
import pendulum
from airflow.decorators import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_taskflow_api():
@task()
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
tutorial_taskflow_api()
Work well. But if I want build pipeline without straight transfer data between task. I mean if I have task without return values. for example in old way:
def extract():
do extract to storage without return value
def transform():
do transform on data extracted without return any value
def load():
do load transformed data without return.
extract>>transform>>load
But how I can build this flow in new approach TaskFlow API?
In some examples I find something like this:
store_data(process_data(extract_bitcoin_price()))
But I still not understand, If I have one task with return value and another without.. Any good example to understand? Many Thanks.