I'm using airflow.operators.sensors.ExternalTaskSensor to make one Dag wait for another.

dag = DAG(
    'dag2',
    default_args={
        'owner': 'Me',
        'depends_on_past': False,
        'start_date': start_datetime,
        'email': ['[email protected]'],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 2,
        'retry_delay': timedelta(minutes=10),
    },
    template_searchpath="%s/me/resources/" % DAGS_FOLDER,
    schedule_interval="{} {} * * *".format(minute, hour),
    max_active_runs=1
)

wait_for_dag1 = ExternalTaskSensor(
    task_id='wait_for_dag1',
    external_dag_id='dag1',
    external_task_id='dag1_task1',
    dag=dag
)

If something seriously wrong happens with upstream Dag and it fails to complete during the given time period, I want upstream Dag (ExternalTaskSensor operator) crash as well, instead of hanging forever.

How can I add a timeout to ExternalTaskSensor?

I'm looking into documentation, but it does not seem to have a timeout parameter or something similar. What should I do?
https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html

1 Answers

1
Community On Best Solutions

The ExternalTaskSensor does take a timeout argument in seconds. It inherits the argument from BaseSensorOperator (https://airflow.apache.org/_modules/airflow/sensors/base_sensor_operator.html). If you pass it timeout=60 on instantiation, it will fail after 60 seconds.