I am working on a similar example as here: https://openlineage.io/docs/guides/airflow-quickstart/ . If you observe, you'll notice that without much configuration, marquez is already aware of the dag flow in airflow. Marquez aware of airflow dags
I have done something similar. I was hoping that marquez would know of the different elements of my dags as well. However, the one that i wrote, shows up as disjointed. What needs to be configured to tell Marquez of the joint nature of my dag?
from airflow import DAG
from airflow.decorators import task
from airflow.datasets import Dataset
from airflow.operators.python import PythonOperator
import pandas as pd
from datetime import datetime
from model1 import calculate_statistics
from model2 import calculate_z_score
dataset = Dataset("data/my-data.csv")
def _model1_calc_stats():
data = pd.read_csv(dataset.uri)
mean_age, std_age = calculate_statistics(data)
return mean_age, std_age
def _model2_z_score(ti):
response = ti.xcom_pull()
mean_age, std_age = response
data = pd.read_csv(dataset.uri)
z_scores = calculate_z_score(data, mean_age, std_age)
print(z_scores)
return z_scores
with DAG(
"pandas_write_and_read", schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False
):
model1_calc_stats = PythonOperator(
task_id="model1_calc_stats", python_callable=_model1_calc_stats
)
model2_z_score = PythonOperator(
task_id="model2_z_score", python_callable=_model2_z_score
)
model1_calc_stats >> model2_z_score
I have already tried aligning my code as much as possible with the example code.
I also tried sending explicit calls to the Marquez client. These didn't succeed. The error messages suggest that the marquez client for workign with datasets should not be used. Instead work with the openlineage client.
Working with the openlineage client, it can locate localhost:5000, but not localhost:5000/api/v1/lineage.
Besides the example doesn't talk about separately running the openlineage client.