Apache Airflow TaskFlow API data pipeline

68 views Asked by At

Possible I misunderstand something, I try build DAG in new approach TaskFLow API: Here example from docs:

import json

import pendulum

from airflow.decorators import dag, task


@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():

    @task()
    def extract():

        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):

        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

    @task()
    def load(total_order_value: float):

        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])    

tutorial_taskflow_api()

Work well. But if I want build pipeline without straight transfer data between task. I mean if I have task without return values. for example in old way:

def extract():
 do extract to storage without return value
def transform():
 do transform on data extracted without return any value
def load():
 do load transformed data without return.
extract>>transform>>load

But how I can build this flow in new approach TaskFlow API?

In some examples I find something like this:

store_data(process_data(extract_bitcoin_price()))

But I still not understand, If I have one task with return value and another without.. Any good example to understand? Many Thanks.

0

There are 0 answers