I am pretty new to Airflow. I am trying to set up SFTPSensor to look on the folder on the SFTP server for any file appear. It sounds for me like a regular expression "*" in the file_pattern property:
import airflow
import logging
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.sensors.python import PythonSensor
from datetime import datetime, timedelta
args = {
"owner": "My_company",
"start_date": datetime(2022,10,17)}
def get_list_of_files():
ftp_hook = SFTPHook(ftp_conn_id="My_company")
files_list = ftp_hook.list_directory("/in/")
logging.info("The list of files is the following:")
logging.info(files_list)
return files_list
dag = DAG(
dag_id = "Checking_SFTP_Server_with_sensor",
default_args=args,
schedule_interval="0 8 * * *",
dagrun_timeout=timedelta(minutes=1),
tags=['My_company'])
check_SFTP = SFTPSensor(task_id="check_SFTP",
sftp_conn_id="My_company",
path="/in/",
file_pattern="*",
poke_interval=15,
timeout=60*5,
dag=dag
)
start = DummyOperator(task_id='start', dag = dag)
def createOrderProcessingTask(file):
return TriggerDagRunOperator(
task_id = f'process_order_{file}',
trigger_dag_id = "Processing_the_order",
conf = {"file_name": file},
dag = dag
)
end = DummyOperator(task_id='end', dag = dag)
files = get_list_of_files()
check_SFTP >> start
for file in files:
task = createOrderProcessingTask(file)
start >> task >> end
But I can't handle that property "file_pattern". The DAG above breaks with the error:
Broken DAG: [/opt/airflow/dags/repo/dags/check_sftp_server_with_sensor.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 390, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 744, in __init__
f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). "
airflow.exceptions.AirflowException: Invalid arguments were passed to SFTPSensor (task_id: check_SFTP). Invalid arguments were:
**kwargs: {'file_pattern': '*'}
What am I missing? Should I use the different approach for that problem?
You probably mixed up the order of your keyword arguments
Have a look at the signature:
You'll see that certain arguments (
path
,file_pattern
,newer_than
andsftp_conn_id
) have their own, explicit argument. If you pass any other keyword arguments, they are packed into the catch-all **kwargs dict.In your case, you are passing
task_id
as your first argument. Since it's not an explicit argument, python assumes thattask_id
and all following arguments should be packed intokwargs
.SFTPSensor
doesn't expectkwargs
to contain the expicit argumentfile_pattern
, and thus it throws the error:Hope this helps
Summary: When calling a function, kwargs come after explicit arguments.