I'm evaluating Airflow 1.9.0 for our distributed orchestration needs (using CeleryExecutor and RabbitMQ), and I am seeing something strange.
I made a dag that has three stages:
- start
- fan out and run N tasks concurrently
- finish
N can be large, maybe up to 10K. I would expect to see N tasks get dumped onto the Rabbit queue when stage 2 begins. Instead I am seeing only a few hundred added at a time. As the workers process the tasks and the queue gets smaller, then more get added to Celery/Rabbit. Eventually, it does finish, however I would really prefer that it dump ALL the work (all 10K tasks) into Celery immediately, for two reasons:
The current way makes the scheduler long-lived and stateful. The scheduler might die after only 5K have completed, in which case the remaining 5K tasks would never get added (I verified this)
I want to use the size of the Rabbit queue as metric to trigger autoscaling events to add more workers. So I need a true picture of how much outstanding work remains (10K, not a few hundred)
I assume the scheduler has some kind of throttle that keeps it from dumping all 10K messages simultaneously? If so is this configurable?
FYI I have already set “parallelism” to 10K in the airflow.cfg
Here is my test dag:
# This dag tests how well airflow fans out
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('fan_out', default_args=default_args, schedule_interval=None)
num_tasks = 10000
starting = BashOperator(
task_id='starting',
bash_command='echo starting',
dag=dag
)
all_done = BashOperator(
task_id='all_done',
bash_command='echo all done',
dag=dag)
for i in range(0, num_tasks):
task = BashOperator(
task_id='say_hello_' + str(i),
bash_command='echo hello world',
dag=dag)
task.set_upstream(starting)
task.set_downstream(all_done)
Thanks to those who suggested other concurrency settings. Through trial and error I learned that I need to set all three of these:
With only these two enabled, I can get to 10K but it is very slow, only adding 100 new tasks in bursts every 30 seconds, in a stair-step fashion:
If I only enable these two, it is the same "stair-step" pattern, with 128 added every 30 seconds:
But if I set all three, it does add 10K to the queue in one shot.