I am attempting to migrate an airflow deployment running in kubernetes from the CeleryExecutor
to the KubernetesExecutor
. Everything went smoothly in my local development environment (running on minikube), however I need to load a sidecar container in production to run a proxy that allows me to connect to my sql database. After some googling it appears that defining the pod_mutation_hook function in an airflow_local_settings.py
file somewhere on the $PYTHONPATH
is how one is supposed to accomplish this.
First I tried defining this in a config map per this example. e.g.
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-config
namespace: dev
data:
...
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: "airflow-logs"
AIRFLOW__KUBERNETES__AIRFLOW_LOCAL_SETTINGS_CONFIGMAP: "airflow-config"
...
airflow_local_settings.py: |
from airflow.contrib.kubernetes.pod import Pod
def pod_mutation_hook(pod: Pod):
extra_labels = {
"test-label": "True",
}
pod.labels.update(extra_labels)
I specified this configmap in the airflow.cfg
file, and it gets picked up and mounted fine, all the other env variables work correctly, but pod_mutation_hook
does not appear to run as no labels are added to the resulting pod launched by the kubernetes executor (note that the logs volume claim is also specified here, and works correctly).
Next I tried to define the airflow_local_settings.py
file in the image that airflow is launching for the job under $AIRFLOW_HOME/configs/airflow_local_settings.py
as suggested in a comment here. I also removed the relevant sections from the airflow-config
configmap above. This also appeared to have no effect on the resulting pod created for the job, as it also lacked the specified labels.
So, I am unsure how to proceed at this point, because I don't understand how I am supposed to specify the airflow_local_settings.py
file and the pod_mutation_hook
function such that they actually mutate the pod before running. Any help would be greatly appreciated. Thank you.
Summary:
You should put your
airflow_local_settings.py
file onPYTHONPATH
for the Scheduler at the very least if you want the sidecars on all pods launched by the KubernetesExecutor or KubernetesPodOperator (with a different Executor) as PODs for both of them are launched by the Scheduler.However, if you also want sidecars on the PODs launched by
KubernetesPodOperator
when usingKubernetesExecutor
you will need to setairflow_local_settings_configmap
inairflow.cfg
(like done at https://github.com/astronomer/airflow-chart/blob/f3dddeffe43c92d594cfcfe9c5b001680f45a986/templates/configmap.yaml#L72) as when you use KubernetePodOperator with KubernetesExecutor, the task pods (with KubernetesPodOperator) will be launched by the worker POD.Notice how we also pass the same configmap to Scheduler deployment (https://github.com/astronomer/airflow-chart/blob/f3dddeffe43c92d594cfcfe9c5b001680f45a986/templates/scheduler/scheduler-deployment.yaml#L125-L135) and to
airflow.cfg
itself too since we want all the PODs to mutate via the pod_mutation_hook.Details:
"airflow.cfg" and "airflow_local_settings.py" file needs to exist on the Scheduler (Whether your Scheduler is on a VM or a POD isn't relevant here). We have also added documentation on where to out this file: https://airflow.apache.org/docs/stable/concepts.html#where-to-put-airflow-local-settings-py
The
pod_mutation_hook
now is used whenever you useKubernetesExecutor
orKubernetePodOperator
. The PODs launched by KubernetesExecutor or KubernetePodOperator will use this mutation hook.Now, back to the configmap. The case when you are using
KubernetesExecutor
and have a task that use KuberneretPodOperator, you need bothairflow.cfg
andairflow_local_settings.py
file to exist on the worker pods launched by the KubernetesExecutor.The KubernetesExecutor launches a Worker Pod for this task.
Scheduler Pod ---> Worker Pod (Pod_1 -- launched by KubernetesExecuetor) --> (Pod_2 -- launched by Pod_1 the task using KubernetePodOperator)
Now the entire [kubernetes] section in airflow.cfg (https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg#L870-L1028) is used only for KubernetesExecutor and affects what is mounted on the Worker Pods launched by KubernetesExecutor.
If you don't specify
airflow_local_settings
configmap, the airflow_local_settings file would not be mounted to the worker pod (Pod_1 in the above example) and only airflow.cfg file is mounted. So now for Pod_2 (launched by Pod_1) -- (special case when you use KubernetesPodOperator with KubernetesExecutor), since Pod_1 (the worker POD) doesn't haveairflow_local_settings.py
file even though the Scheduler has it, Pod_2 would not be mutated since the file doesn't exist over there.Consider it same as airflow.cfg -- why do you mount
airflow.cfg
file both to the Scheduler POD and the worker POD. Similarly for this edge case you needairflow_local_settings.py
file at both the places.https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/kubernetes/worker_configuration.py#L279-L305 --> This code is used to decide what is mounted on the Worker Pod (REF_1)
https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/executors/kubernetes_executor.py#L462-L481 --> Pod created for each task run by KubernetesExecutor (REF_2) -- The mutation is applied to this POD as this is launched by the Scheduler and it has
airflow_local_settings.py
filehttps://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L383 --> This code is used to create a new POD when using KubernetesPod Operator (REF_3) -- Since
airflow_local_settings.py
wasn't mounted on POD generated in REF_2 the mutations weren't applied to this POD.