I am currently working on creating airflow dags which performs set of tasks inside all the files in a given directory. Below is different version of a dag but there are multiple steps involved.
def write_log_file(source_file: str, record_content: str) -> str:
with open(source_file, "a") as log_file:
log_file.write(f"{record_content}\n")
return f"logged {record_content} in file {os.path.basename(source_file)} successfully"
# DAG definition
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
with DAG(
'my_inbound_files_dag',
default_args=default_args,
description='TEST DAG for processing inbound files',
schedule_interval='@daily',
catchup=False,
) as dag:
purge_processing = LocalFileOperator(
task_id="purge_processing",
operation="purge",
source=SFTP_INBOUND,
days_threshold=550,
)
@task_group(group_id="process_inbound_files_group")
def processing_inbound_files(inbound_file_path, **kwargs):
@task
def write_log_file(source_file: str):
print(source_file)
write_log_file_op = PythonOperator(
task_id=f"write_log_file",
python_callable=write_log_file,
op_kwargs={
"source_file": TEST,
"record_content": f"{source_file}\n",
},
)
write_log_file_op.execute(context=kwargs)
@task
def write_log_file_next(source_file: str):
print(source_file)
write_log_file_next = PythonOperator(
task_id=f"write_log_file_next",
python_callable=write_log_file,
op_kwargs={
"source_file": TEST,
"record_content": f"{source_file}2\n",
},
)
write_log_file_next.execute(context=kwargs)
write_log_file(inbound_file_path) >> write_log_file_next(inbound_file_path)
process_inbound_files_group_op = processing_inbound_files.expand(
inbound_file_path=[os.path.join(SOURCE, file_name) for file_name in os.listdir(SOURCE)]
)
exit_job = DummyOperator(task_id="exit_job",)
purge_processing >> process_inbound_files_group_op >> exit_job
I am not sure what is happening, tasks are all completed in airflow UI but it is not actually executed. I tried to debug it but nothing is happening. Any assistance on this matter is really helpful.