CPU Limit Issue with Airflow's XCom and KubernetesPodOperator

78 views Asked by At

I'm encountering an issue with Apache Airflow (version 2.3.4) and Kubernetes (version 23.6.0) when using XCom with the KubernetesPodOperator. The sidecar container created for facilitating communication is set with a default CPU limit of 1m, which is below Airflow's minimum limit of 100m. This is causing the following error and preventing my workflows from executing correctly:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 132, in run_pod_async
    resp = self._client.create_namespaced_pod(
  File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/client/api/core_v1_api.py", line 7356, in create_namespaced_pod
    return self.create_namespaced_pod_with_http_info(namespace, body, **kwargs)  # noqa: E501
  File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/client/api/core_v1_api.py", line 7455, in create_namespaced_pod_with_http_info
    return self.api_client.call_api(
  File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
  File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
  File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/client/api_client.py", line 391, in request
    return self.rest_client.POST(url,
  File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/client/rest.py", line 275, in POST
    return self.request("POST", url,
  File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/client/rest.py", line 234, in request
    raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '72861dac-5610-4f1c-96ba-fa20bd9bf59c', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '90e30808-9f9f-45a0-8684-8718a02bc2b4', 'X-Kubernetes-Pf-Prioritylevel-Uid': '10daf657-1b9d-4113-a489-eea29725174c', 'Date': 'Fri, 22 Dec 2023 12:24:29 GMT', 'Content-Length': '334'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods \"get-srx-to-process-63e8201e72124d87ad23290e8d5d87ee\" is forbidden: minimum cpu usage per Container is 100m, but request is 1m","reason":"Forbidden","details":{"name":"get-srx-to-process-63e8201e72124d87ad23290e8d5d87ee","kind":"pods"},"code":403}

Here's the relevant part of my DAG code:

from airflow import DAG
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import V1PersistentVolumeClaimVolumeSource, V1Volume, V1VolumeMount, V1EnvVar
from kubernetes.client import models as k8s

config = dict(

    default_args=dict(
        # some params
        in_cluster=True,
        is_delete_operator_pod=True,
        on_success_callback=lambda ctx: ctx['ti'].xcom_push(key='status', value='success'),
        on_failure_callback=lambda ctx: ctx['ti'].xcom_push(key='status', value='failed')
    ),

    volume_mounts=dict(
        # uftp=V1VolumeMount(...),

    ),

    volumes=dict(
        # uftp=V1Volume(...),

    ),

    get_srx_to_process=dict(
        limits=k8s.V1ResourceRequirements(
            requests={"cpu": "0.25", "memory": "3G"},
            limits={"cpu": "0.25", "memory": "3G"}),
        retries=2,
    ),

    download_fastq=dict(
        limits=k8s.V1ResourceRequirements(
            requests={"cpu": "2.0", "memory": "6G"},
            limits={"cpu": "2.0", "memory": "6G"}),
    ),

)

with DAG(
        dag_id='TEST',
        default_args=config['default_args'],
        max_active_runs=5,
        description='TEST',
        schedule_interval=timedelta(seconds=30),
        catchup=False,

) as dag:
    get_srx_to_process = KubernetesPodOperator(
        name="get_srx_to_process",
        task_id="get_srx_to_process",
        retries=config['get_srx_to_process']['retries'],
        container_resources=config['get_srx_to_process']['limits'],
        env_vars=[V1EnvVar(name='run_id', value='{{ run_id }}'),
                  V1EnvVar(name='GCP_API', value=Variable.get("GCP_API"))],
        cmds=["python", "get_srx_to_process.py"],
        volume_mounts=[config['volume_mounts']['uftp']],
        volumes=[config['volumes']['uftp']],
        do_xcom_push=True,
    )

    download_fastq = KubernetesPodOperator(
        name="download_fastq",
        task_id="download_fastq",
        container_resources=config['download_fastq']['limits'],
        env_vars=[V1EnvVar(name='run_id', value='{{ run_id }}'),
                  V1EnvVar(name='GCP_API', value=Variable.get("GCP_API")),
                  V1EnvVar(name='SRX_id', value='{{ ti.xcom_pull(task_ids="get_srx_to_process") }}'),
                  ],
        cmds=["python", "download_fastq.py"],
        volume_mounts=[config['volume_mounts']['uftp']],
        volumes=[config['volumes']['uftp']],
    )

    # Basic behavior
    get_srx_to_process >> \
    download_fastq
    # ...

In my larger pipeline, multiple tasks use different Docker images, and XCom's main role is to transfer a single variable between tasks.

How can I adjust the CPU limits for the sidecar container to meet Airflow's requirements and resolve this error?

0

There are 0 answers