I'm encountering an issue with Apache Airflow (version 2.3.4) and Kubernetes (version 23.6.0) when using XCom with the KubernetesPodOperator. The sidecar container created for facilitating communication is set with a default CPU limit of 1m, which is below Airflow's minimum limit of 100m. This is causing the following error and preventing my workflows from executing correctly:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 132, in run_pod_async
resp = self._client.create_namespaced_pod(
File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/client/api/core_v1_api.py", line 7356, in create_namespaced_pod
return self.create_namespaced_pod_with_http_info(namespace, body, **kwargs) # noqa: E501
File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/client/api/core_v1_api.py", line 7455, in create_namespaced_pod_with_http_info
return self.api_client.call_api(
File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/client/api_client.py", line 348, in call_api
return self.__call_api(resource_path, method,
File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
response_data = self.request(
File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/client/api_client.py", line 391, in request
return self.rest_client.POST(url,
File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/client/rest.py", line 275, in POST
return self.request("POST", url,
File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/client/rest.py", line 234, in request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '72861dac-5610-4f1c-96ba-fa20bd9bf59c', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '90e30808-9f9f-45a0-8684-8718a02bc2b4', 'X-Kubernetes-Pf-Prioritylevel-Uid': '10daf657-1b9d-4113-a489-eea29725174c', 'Date': 'Fri, 22 Dec 2023 12:24:29 GMT', 'Content-Length': '334'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods \"get-srx-to-process-63e8201e72124d87ad23290e8d5d87ee\" is forbidden: minimum cpu usage per Container is 100m, but request is 1m","reason":"Forbidden","details":{"name":"get-srx-to-process-63e8201e72124d87ad23290e8d5d87ee","kind":"pods"},"code":403}
Here's the relevant part of my DAG code:
from airflow import DAG
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import V1PersistentVolumeClaimVolumeSource, V1Volume, V1VolumeMount, V1EnvVar
from kubernetes.client import models as k8s
config = dict(
default_args=dict(
# some params
in_cluster=True,
is_delete_operator_pod=True,
on_success_callback=lambda ctx: ctx['ti'].xcom_push(key='status', value='success'),
on_failure_callback=lambda ctx: ctx['ti'].xcom_push(key='status', value='failed')
),
volume_mounts=dict(
# uftp=V1VolumeMount(...),
),
volumes=dict(
# uftp=V1Volume(...),
),
get_srx_to_process=dict(
limits=k8s.V1ResourceRequirements(
requests={"cpu": "0.25", "memory": "3G"},
limits={"cpu": "0.25", "memory": "3G"}),
retries=2,
),
download_fastq=dict(
limits=k8s.V1ResourceRequirements(
requests={"cpu": "2.0", "memory": "6G"},
limits={"cpu": "2.0", "memory": "6G"}),
),
)
with DAG(
dag_id='TEST',
default_args=config['default_args'],
max_active_runs=5,
description='TEST',
schedule_interval=timedelta(seconds=30),
catchup=False,
) as dag:
get_srx_to_process = KubernetesPodOperator(
name="get_srx_to_process",
task_id="get_srx_to_process",
retries=config['get_srx_to_process']['retries'],
container_resources=config['get_srx_to_process']['limits'],
env_vars=[V1EnvVar(name='run_id', value='{{ run_id }}'),
V1EnvVar(name='GCP_API', value=Variable.get("GCP_API"))],
cmds=["python", "get_srx_to_process.py"],
volume_mounts=[config['volume_mounts']['uftp']],
volumes=[config['volumes']['uftp']],
do_xcom_push=True,
)
download_fastq = KubernetesPodOperator(
name="download_fastq",
task_id="download_fastq",
container_resources=config['download_fastq']['limits'],
env_vars=[V1EnvVar(name='run_id', value='{{ run_id }}'),
V1EnvVar(name='GCP_API', value=Variable.get("GCP_API")),
V1EnvVar(name='SRX_id', value='{{ ti.xcom_pull(task_ids="get_srx_to_process") }}'),
],
cmds=["python", "download_fastq.py"],
volume_mounts=[config['volume_mounts']['uftp']],
volumes=[config['volumes']['uftp']],
)
# Basic behavior
get_srx_to_process >> \
download_fastq
# ...
In my larger pipeline, multiple tasks use different Docker images, and XCom's main role is to transfer a single variable between tasks.
How can I adjust the CPU limits for the sidecar container to meet Airflow's requirements and resolve this error?