How to set up SFTPSensor in Airflow to react on any file appearing on the server?

1.7k views Asked by At

I am pretty new to Airflow. I am trying to set up SFTPSensor to look on the folder on the SFTP server for any file appear. It sounds for me like a regular expression "*" in the file_pattern property:

import airflow
import logging
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.sensors.python import PythonSensor
from datetime import datetime, timedelta

args = {
    "owner": "My_company",
    "start_date": datetime(2022,10,17)}

def get_list_of_files():
    ftp_hook = SFTPHook(ftp_conn_id="My_company")
    files_list = ftp_hook.list_directory("/in/")
    logging.info("The list of files is the following:")
    logging.info(files_list)
    return files_list

dag = DAG(
    dag_id = "Checking_SFTP_Server_with_sensor",
    default_args=args,
    schedule_interval="0 8 * * *",
    dagrun_timeout=timedelta(minutes=1),
    tags=['My_company'])    

check_SFTP = SFTPSensor(task_id="check_SFTP", 
                        sftp_conn_id="My_company",
                        path="/in/",
                        file_pattern="*",
                        poke_interval=15, 
                        timeout=60*5,
                        dag=dag 
                        )

start =  DummyOperator(task_id='start', dag = dag)

def createOrderProcessingTask(file):
    return TriggerDagRunOperator(
                                task_id = f'process_order_{file}', 
                                trigger_dag_id = "Processing_the_order",
                                conf = {"file_name": file},
                                dag = dag
                                )

end = DummyOperator(task_id='end', dag = dag)

files = get_list_of_files()

check_SFTP >> start

for file in files:
    task = createOrderProcessingTask(file)
    start >> task >> end

But I can't handle that property "file_pattern". The DAG above breaks with the error:

Broken DAG: [/opt/airflow/dags/repo/dags/check_sftp_server_with_sensor.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 390, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 744, in __init__
    f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). "
airflow.exceptions.AirflowException: Invalid arguments were passed to SFTPSensor (task_id: check_SFTP). Invalid arguments were:
**kwargs: {'file_pattern': '*'}

What am I missing? Should I use the different approach for that problem?

1

There are 1 answers

0
Nico On

You probably mixed up the order of your keyword arguments

Have a look at the signature:

SFTPSensor(*, path, file_pattern='', newer_than=None, sftp_conn_id='sftp_default', **kwargs)

You'll see that certain arguments (path, file_pattern, newer_than and sftp_conn_id) have their own, explicit argument. If you pass any other keyword arguments, they are packed into the catch-all **kwargs dict.

In your case, you are passing task_id as your first argument. Since it's not an explicit argument, python assumes that task_id and all following arguments should be packed into kwargs.

SFTPSensor doesn't expect kwargs to contain the expicit argument file_pattern, and thus it throws the error:

Invalid arguments were: **kwargs: {'file_pattern': '*'}

Hope this helps

Summary: When calling a function, kwargs come after explicit arguments.