Airflow error with SqlSensor not recognising the postgres type of connection

67 views Asked by At

I'm trying to use an SqlSensor task in my DAA but I'm getting an 'unknown hook type "postgres" error. I want to be able to revisit the db table to check if new rows were added within the last daily run.

I'm running Airflow 2.8.1 on Ubuntu and have installed the apache-airflow-providers-postgres and apache-airflow-providers-common-sql. I'm trying to run an SqlSensor to check that new rows have been added to a table:

    sql_sensor = SqlSensor(
        task_id="sql_sensor",
        conn_id="pg_conn",
        success=_success_criteria,
        sql="SELECT COUNT(*) FROM public.intradaytrades WHERE timestamp > CURRENT_DATE - INTERVAL '1 day';",
        mode="reschedule",
        fail_on_empty=True,
        timeout=60 * 60,  # 1 hour timeout for the sensor
        poke_interval=60 * 5,  # 5 minutes between pokes
    )

The pg_conn is setup, and tested, with 'Postgres' type in Airflow connections.

However I'm getting this error when running that sensor task:

[2024-02-04, 01:10:25 UTC] {base.py:83} INFO - Using connection ID 'pg_conn' for task execution.
[2024-02-04, 01:10:25 UTC] {taskinstance.py:2698} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/var/lib/airflow/python_3_11_airflow/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/lib/airflow/python_3_11_airflow/lib/python3.11/site-packages/airflow/sensors/base.py", line 265, in execute
    raise e
  File "/var/lib/airflow/python_3_11_airflow/lib/python3.11/site-packages/airflow/sensors/base.py", line 247, in execute
    poke_return = self.poke(context)
                  ^^^^^^^^^^^^^^^^^^
  File "/var/lib/airflow/python_3_11_airflow/lib/python3.11/site-packages/airflow/providers/common/sql/sensors/sql.py", line 93, in poke
    hook = self._get_hook()
           ^^^^^^^^^^^^^^^^
  File "/var/lib/airflow/python_3_11_airflow/lib/python3.11/site-packages/airflow/providers/common/sql/sensors/sql.py", line 84, in _get_hook
    hook = conn.get_hook(hook_params=self.hook_params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/lib/airflow/python_3_11_airflow/lib/python3.11/site-packages/airflow/models/connection.py", line 363, in get_hook
    raise AirflowException(f'Unknown hook type "{self.conn_type}"')
airflow.exceptions.AirflowException: Unknown hook type "postgres"

What could be the reason for this fail? I've tested the pg_conn by using the PostgresHook directly with a custom query task and can fetachall rows from the db.

apache-airflow==2.8.1
apache-airflow-providers-celery==3.5.2
apache-airflow-providers-common-io==1.2.0
apache-airflow-providers-common-sql==1.10.1
apache-airflow-providers-ftp==3.7.0
apache-airflow-providers-github==2.5.1
apache-airflow-providers-http==4.8.0
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-microsoft-winrm==3.4.0
apache-airflow-providers-postgres==5.10.0
apache-airflow-providers-redis==3.6.0
apache-airflow-providers-slack==8.6.0
apache-airflow-providers-sqlite==3.7.0
apache-airflow-providers-ssh==3.10.0
0

There are 0 answers