Airflow using remote kubernetes cluster with KubernetesExecutor fails to schedule

51 views Asked by At

I'm running airflow (2.6.3) locally a test environment which is fine. However I now need to add the ability to use the KubernetesExecutor to execute tasks in a remote (not the same machine) kubernetes cluster.

My airflow.cfg file looks like this:

[core]
sql_alchemy_conn = postgresql+psycopg2://aflow:flow@postgres/airflow
airflow_home = /opt/airflow
executor = KubernetesExecutor
scheduler_health_check_threshold = 240
orphaned_tasks_check_interval = 300
dag_file_processor_timeout = 240

[celery]
result_backend = postgresql+psycopg2://aflow:aflow@postgres/airflow
broker_url = redis://redis:6379/0

[kubernetes_executor]
multi_namespace_mode = False
namespace = etl
pod_template_file = /opt/airflow/pod_templates/pod_template_file.yaml
worker_container_repository = anexpensivecrmgnt.azurecr.io/data/etl/airflow
worker_container_tag = latest
config_file = /opt/airflow/.kube/config
in_cluster = False
cluster_context = aks-cluster

When I run tasks, it is queued and never picked up.

example task:

import os
from datetime import datetime
from time import sleep
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task
from etls.utils import kubernetes_pod_params

with DAG(
    dag_id="test_manual_triggered_dag",
    start_date=datetime(2023, 6, 10),
    schedule=None,
    catchup=False,
    default_args=None
) as test_manual_triggered_dag:
    @task(executor_config=kubernetes_pod_params(), queue="kubernetes")
    def ping() -> None:
        sleep(120)
        print("ping")

        ping_ = ping()
        @task(executor_config=kubernetes_pod_params(), queue="kubernetes")
        def pong() -> None:
            sleep(120)
            print("pong")
        pong_ = pong()

Looking at the scheduler logs I cannot see any logs from the kubernetes_executor.py executor, it is not even loaded.

2023-12-04 15:46:30  ____    |__( )_________  __/__  /________      __
2023-12-04 15:46:30 ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
2023-12-04 15:46:30 ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
2023-12-04 15:46:30  _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
2023-12-04 15:46:30 [2023-12-04T15:46:30.244+0000] {executor_loader.py:114} INFO - Loaded executor: CeleryExecutor
2023-12-04 15:46:30 [2023-12-04T15:46:30.346+0000] {scheduler_job_runner.py:788} INFO - Starting the scheduler
2023-12-04 15:46:30 [2023-12-04T15:46:30.347+0000] {scheduler_job_runner.py:795} INFO - Processing each file at most -1 times
2023-12-04 15:46:30 [2023-12-04T15:46:30.361+0000] {manager.py:165} INFO - Launched DagFileProcessorManager with pid: 34
2023-12-04 15:46:30 [2023-12-04T15:46:30.363+0000] {scheduler_job_runner.py:1553} INFO - Resetting orphaned tasks for active dag runs
2023-12-04 15:46:30 [2023-12-04T15:46:30.372+0000] {settings.py:60} INFO - Configured default timezone Timezone('UTC')
2023-12-04 15:47:34 [2023-12-04T15:47:34.390+0000] {scheduler_job_runner.py:411} INFO - 1 tasks up for execution:
2023-12-04 17:31:13 [2023-12-04T17:31:13.351+0000] {scheduler_job_runner.py:411} INFO - 1 tasks up for execution:
2023-12-04 17:31:13     <TaskInstance: test_manual_triggered_dag.ping manual__2023-12-04T01:13:57.864268+00:00 [scheduled]>
2023-12-04 17:31:13 [2023-12-04T17:31:13.351+0000] {scheduler_job_runner.py:476} INFO - DAG test_manual_triggered_dag has 0/16 running and queued tasks
2023-12-04 17:31:13 [2023-12-04T17:31:13.351+0000] {scheduler_job_runner.py:587} INFO - Setting the following tasks to queued state:
2023-12-04 17:31:13     <TaskInstance: test_manual_triggered_dag.ping manual__2023-12-04T01:13:57.864268+00:00 [scheduled]>
2023-12-04 17:31:13 [2023-12-04T17:31:13.353+0000] {scheduler_job_runner.py:625} INFO - Sending TaskInstanceKey(dag_id='test_manual_triggered_dag', task_id='ping', run_id='manual__2023-12-04T01:13:57.864268+00:00', try_number=1, map_index=-1) to executor with priority 3 and queue kubernetes
2023-12-04 17:31:13 [2023-12-04T17:31:13.353+0000] {base_executor.py:147} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_manual_triggered_dag', 'ping', 'manual__2023-12-04T01:13:57.864268+00:00', '--local', '--subdir', 'DAGS_FOLDER/etls/yet_another_test_etl.py']
2023-12-04 17:31:13 [2023-12-04T17:31:13.376+0000] {scheduler_job_runner.py:677} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='test_manual_triggered_dag', task_id='ping', run_id='manual__2023-12-04T01:13:57.864268+00:00', try_number=1, map_index=-1)
2023-12-04 17:31:13 [2023-12-04T17:31:13.382+0000] {scheduler_job_runner.py:703} INFO - Setting external_id for <TaskInstance: test_manual_triggered_dag.ping manual__2023-12-04T01:13:57.864268+00:00 [queued]> to ec6a9e7d-4820-49a9-9a11-e1fe379d5f45
0

There are 0 answers