I'm running airflow (2.6.3) locally a test environment which is fine. However I now need to add the ability to use the KubernetesExecutor to execute tasks in a remote (not the same machine) kubernetes cluster.
My airflow.cfg file looks like this:
[core]
sql_alchemy_conn = postgresql+psycopg2://aflow:flow@postgres/airflow
airflow_home = /opt/airflow
executor = KubernetesExecutor
scheduler_health_check_threshold = 240
orphaned_tasks_check_interval = 300
dag_file_processor_timeout = 240
[celery]
result_backend = postgresql+psycopg2://aflow:aflow@postgres/airflow
broker_url = redis://redis:6379/0
[kubernetes_executor]
multi_namespace_mode = False
namespace = etl
pod_template_file = /opt/airflow/pod_templates/pod_template_file.yaml
worker_container_repository = anexpensivecrmgnt.azurecr.io/data/etl/airflow
worker_container_tag = latest
config_file = /opt/airflow/.kube/config
in_cluster = False
cluster_context = aks-cluster
When I run tasks, it is queued and never picked up.
example task:
import os
from datetime import datetime
from time import sleep
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task
from etls.utils import kubernetes_pod_params
with DAG(
dag_id="test_manual_triggered_dag",
start_date=datetime(2023, 6, 10),
schedule=None,
catchup=False,
default_args=None
) as test_manual_triggered_dag:
@task(executor_config=kubernetes_pod_params(), queue="kubernetes")
def ping() -> None:
sleep(120)
print("ping")
ping_ = ping()
@task(executor_config=kubernetes_pod_params(), queue="kubernetes")
def pong() -> None:
sleep(120)
print("pong")
pong_ = pong()
Looking at the scheduler logs I cannot see any logs from the kubernetes_executor.py executor, it is not even loaded.
2023-12-04 15:46:30 ____ |__( )_________ __/__ /________ __
2023-12-04 15:46:30 ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
2023-12-04 15:46:30 ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
2023-12-04 15:46:30 _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
2023-12-04 15:46:30 [2023-12-04T15:46:30.244+0000] {executor_loader.py:114} INFO - Loaded executor: CeleryExecutor
2023-12-04 15:46:30 [2023-12-04T15:46:30.346+0000] {scheduler_job_runner.py:788} INFO - Starting the scheduler
2023-12-04 15:46:30 [2023-12-04T15:46:30.347+0000] {scheduler_job_runner.py:795} INFO - Processing each file at most -1 times
2023-12-04 15:46:30 [2023-12-04T15:46:30.361+0000] {manager.py:165} INFO - Launched DagFileProcessorManager with pid: 34
2023-12-04 15:46:30 [2023-12-04T15:46:30.363+0000] {scheduler_job_runner.py:1553} INFO - Resetting orphaned tasks for active dag runs
2023-12-04 15:46:30 [2023-12-04T15:46:30.372+0000] {settings.py:60} INFO - Configured default timezone Timezone('UTC')
2023-12-04 15:47:34 [2023-12-04T15:47:34.390+0000] {scheduler_job_runner.py:411} INFO - 1 tasks up for execution:
2023-12-04 17:31:13 [2023-12-04T17:31:13.351+0000] {scheduler_job_runner.py:411} INFO - 1 tasks up for execution:
2023-12-04 17:31:13 <TaskInstance: test_manual_triggered_dag.ping manual__2023-12-04T01:13:57.864268+00:00 [scheduled]>
2023-12-04 17:31:13 [2023-12-04T17:31:13.351+0000] {scheduler_job_runner.py:476} INFO - DAG test_manual_triggered_dag has 0/16 running and queued tasks
2023-12-04 17:31:13 [2023-12-04T17:31:13.351+0000] {scheduler_job_runner.py:587} INFO - Setting the following tasks to queued state:
2023-12-04 17:31:13 <TaskInstance: test_manual_triggered_dag.ping manual__2023-12-04T01:13:57.864268+00:00 [scheduled]>
2023-12-04 17:31:13 [2023-12-04T17:31:13.353+0000] {scheduler_job_runner.py:625} INFO - Sending TaskInstanceKey(dag_id='test_manual_triggered_dag', task_id='ping', run_id='manual__2023-12-04T01:13:57.864268+00:00', try_number=1, map_index=-1) to executor with priority 3 and queue kubernetes
2023-12-04 17:31:13 [2023-12-04T17:31:13.353+0000] {base_executor.py:147} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_manual_triggered_dag', 'ping', 'manual__2023-12-04T01:13:57.864268+00:00', '--local', '--subdir', 'DAGS_FOLDER/etls/yet_another_test_etl.py']
2023-12-04 17:31:13 [2023-12-04T17:31:13.376+0000] {scheduler_job_runner.py:677} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='test_manual_triggered_dag', task_id='ping', run_id='manual__2023-12-04T01:13:57.864268+00:00', try_number=1, map_index=-1)
2023-12-04 17:31:13 [2023-12-04T17:31:13.382+0000] {scheduler_job_runner.py:703} INFO - Setting external_id for <TaskInstance: test_manual_triggered_dag.ping manual__2023-12-04T01:13:57.864268+00:00 [queued]> to ec6a9e7d-4820-49a9-9a11-e1fe379d5f45