How to run same dag two times in a single run in Airflow

1.5k views Asked by At

I am absolutely new to Airflow. I have one requirement where I have to run two EMR jobs. . Currently I have a python script which depends on some input files, if present it triggers a EMR job.

My new requirement is, I will be having to different input files(same type) and these two files will be input to the emr jobs, in both of this two cases the spark will do the same thing but only the input file are different.

create_job_workflow = EmrCreateJobFlowOperator(
    task_id='some-task',
    job_flow_overrides=job_flow_args,
    aws_conn_id=aws_conn,
    emr_conn_id=emr_conn,
    dag=dag
)

Ho can I achieve this to run two same dag run by only changing the input file inside spark-submit, basically whenever I will do 'trigger DAG' it will take two different input files and trigger two different emr jobs in two different emr cluster. Or can you any one please provide me some best practice to do it? Or any how is it possible by altering the max_active_runs=2

3

There are 3 answers

0
Fedor On BEST ANSWER

Another proper option, I believe, would be to use SubDAGs

so you need to redefine your main processing DAG-flow as an SubDAG, and then create new 'general DAG' flow, which will just have two SubDagOperator for each run with different input filename as parameter. Please refer for details and example by the link above.

0
Alejandro Kaspar On

Best practice will be to have two different tasks for it. by setting max_active_runs=2 you will just limit the number of concurrent dag_runs to 2. You can take help of any data structure to set the config for your tasks, iterate over it and build the tasks based on each attribute.

Another thing you can do:

You can receive the filename as the payload of your dag Access it like: context['dag_run'].conf.get('filename')

And retrigger the same dag with a trigger dag_run operator, updating the desired payload with the other file

0
Fedor On

found another approach, based on Airflow TaskGroups

by the link is very detailed explanation. Here is brief summary:

Airflow TaskGroups have been introduced to make your DAG visually cleaner and easier to read. They are meant to replace SubDAGs which was the historic way of grouping your tasks. The problem with SubDAGs is that they are much more than that. They bring a lot of complexity as you need to create a DAG in a DAG, import the SubDagOperator, define the parameters properly, and so on. Therefore, SubDAGs are going to be deprecated... Ultimately, TaskGroups help you to better organize your tasks and maintain your DAGs without the harsh of SubDAGs.