I have an Airflow DAG with a dynamically mapped task. We run quite heavy loads there (>4k mapped tasks). For this DAG the max_active_tasks is set to 32. It may take couple of days to run completely through. When DAG starts, everything looks fine, I see pods regularly spinning up and down, maximizing at count of 32.
However, after several hours of work the tasks stop being queued and stay in sheduled state. At some point there is no active task pod in the cluster. When I restart schedulers, everything gets back to normal, the tasks start queueing again. It works as a workaround, but I need a more stable solution.
I'm confused, why this can happen. Where should I start my investigation?
The scheduler logs do not contain anything suspicious, at least to me. There are occasionally warnings of these kinds:
WARNING - Killing DAGFileProcessorProcess
DETAIL: Key (dag_id)=(my_dag_name) already exists.
[SQL: INSERT INTO serialized_dag (dag_id, fileloc, fileloc_hash, data, data_compressed, last_updated, dag_hash, processor_subdir) VALUES (%(dag_id)s, %(fileloc)s, %(fileloc_hash)s, %(data)s, %(data_compressed)s, %(last_updated)s, %(dag_hash)s, %(processor_subdir)s)]
{manager.py:543} INFO - DAG my_dag_name is missing and will be deactivated.
{manager.py:553} INFO - Deactivated 1 DAGs which are no longer present in file.
{manager.py:557} INFO - Deleted DAG my_dag_name in serialized_dag table
However I see this messages evenly distributed across the logs, even when everything works fine.
Also what is definitively not an issue:
- Other DAGs running in parallel. This behaviour happens when my DAG is the only one running
Any idea where to start looking?