How to get the completion status of a Dataprep flow run by Airflow's SimpleHttpOperator

27 views Asked by At

I'm building a DAG in Airflow that interacts with Google BigQuery and Trifacta Dataprep. I use the SimpleHttpOperator to trigger a Dataprep flow, which has two output nodes. Our company VM runs an older version of Python, so I can't use newer operators tailored for Dataprep.

Problem:

My SimpleHttpOperator is marked as "Success" as soon as the POST request is successfully sent to Dataprep. However, if one of the Dataprep flow's output nodes fails, the task still shows as "Success" in Airflow. I'd like the task to be marked as "Success" only when the flow completes successfully.

I attempted to add a response_check=lambda response: response.status_code == 200 in the operator, but the task fails, apparently never receiving a 200 response.

Additionally, I checked the xcom pushed by the task, but it seems to only contain the sessionID, flowID, and JobID.

Question:

Is there a way or perhaps a sensor in Airflow that could monitor the Dataprep flow and sense its completion status?

Here's a sanitized version of my current task:

run_dataprep_flow = SimpleHttpOperator(
    task_id='run_dataprep_flow_SANITIZED',
    http_conn_id='http_dataprep',
    endpoint='/SANITIZED_ENDPOINT',
    method='POST',
    data=json.dumps({
        "runParameters": {
            "overrides": {
                "data": [
                    {"key": "var_key1", "value": "SANITIZED_VALUE"},
                    {"key": "var_key2", "value": "SANITIZED_VALUE"},
                    {"key": "var_key3", "value": "SANITIZED_VALUE"},
                    {"key": "var_key4", "value": "SANITIZED_VALUE"},
                    {"key": "var_key5", "value": "SANITIZED_VALUE"}
                ]
            }
        }}),
    headers={'SANITIZED_HEADER'},
    do_xcom_push=True,
    dag=dag,
)

I'd appreciate any guidance or suggestions on how to handle this!

0

There are 0 answers