Executing same operator in cloud composer as multiple tasks

71 views Asked by At

I have a PythonOperator in Airflow that gets executed using Cloud Composer:

with DAG(
    dag_id = config['dag_id'],
    schedule_interval = config['schedule_interval'],
    default_args = default_args
    ) as dag:
    
    generate_data_task = PythonOperator(
        task_id = 'generate_dummy_data',
        python_callable = generate_data,
        dag = dag
    )

The generate_data() function writes a randomly generated uniquely named CSV file in a bucket with some data in it. Executed as is works great, but I want to execute the same task multiple times in parallel. If I were to specify to execute it 10 times in parallel I would expect to have 10 files written in the bucket. I have tried with concurrency and task_concurrency, but I get the same result.

Is this achievable using Airflow on top of Cloud Composer?

1

There are 1 answers

2
Kombajn zbożowy On BEST ANSWER

Using dynamic task mapping:

    generate_data_task = PythonOperator.partial(
        task_id = 'generate_dummy_data',
        python_callable = generate_data,
        dag = dag
    ).expand(op_args=[[]] * 10)