pod_mutation_hook function not working on airflow running in kubernetes using KubernetesExecutor

2.6k views Asked by At

I am attempting to migrate an airflow deployment running in kubernetes from the CeleryExecutor to the KubernetesExecutor. Everything went smoothly in my local development environment (running on minikube), however I need to load a sidecar container in production to run a proxy that allows me to connect to my sql database. After some googling it appears that defining the pod_mutation_hook function in an airflow_local_settings.py file somewhere on the $PYTHONPATH is how one is supposed to accomplish this.

First I tried defining this in a config map per this example. e.g.

apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-config
  namespace: dev
data:
  ...

  AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: "airflow-logs"

  AIRFLOW__KUBERNETES__AIRFLOW_LOCAL_SETTINGS_CONFIGMAP: "airflow-config"
  ...

  airflow_local_settings.py: |
    from airflow.contrib.kubernetes.pod import Pod

    def pod_mutation_hook(pod: Pod):
        extra_labels = {
            "test-label": "True",
        }
        pod.labels.update(extra_labels)

I specified this configmap in the airflow.cfg file, and it gets picked up and mounted fine, all the other env variables work correctly, but pod_mutation_hook does not appear to run as no labels are added to the resulting pod launched by the kubernetes executor (note that the logs volume claim is also specified here, and works correctly).

Next I tried to define the airflow_local_settings.py file in the image that airflow is launching for the job under $AIRFLOW_HOME/configs/airflow_local_settings.py as suggested in a comment here. I also removed the relevant sections from the airflow-config configmap above. This also appeared to have no effect on the resulting pod created for the job, as it also lacked the specified labels.

So, I am unsure how to proceed at this point, because I don't understand how I am supposed to specify the airflow_local_settings.py file and the pod_mutation_hook function such that they actually mutate the pod before running. Any help would be greatly appreciated. Thank you.

3

There are 3 answers

1
kaxil On

Summary:

You should put your airflow_local_settings.py file on PYTHONPATH for the Scheduler at the very least if you want the sidecars on all pods launched by the KubernetesExecutor or KubernetesPodOperator (with a different Executor) as PODs for both of them are launched by the Scheduler.

However, if you also want sidecars on the PODs launched by KubernetesPodOperator when using KubernetesExecutor you will need to set airflow_local_settings_configmap in airflow.cfg (like done at https://github.com/astronomer/airflow-chart/blob/f3dddeffe43c92d594cfcfe9c5b001680f45a986/templates/configmap.yaml#L72) as when you use KubernetePodOperator with KubernetesExecutor, the task pods (with KubernetesPodOperator) will be launched by the worker POD.

Notice how we also pass the same configmap to Scheduler deployment (https://github.com/astronomer/airflow-chart/blob/f3dddeffe43c92d594cfcfe9c5b001680f45a986/templates/scheduler/scheduler-deployment.yaml#L125-L135) and to airflow.cfg itself too since we want all the PODs to mutate via the pod_mutation_hook.

Details:

"airflow.cfg" and "airflow_local_settings.py" file needs to exist on the Scheduler (Whether your Scheduler is on a VM or a POD isn't relevant here). We have also added documentation on where to out this file: https://airflow.apache.org/docs/stable/concepts.html#where-to-put-airflow-local-settings-py

The pod_mutation_hook now is used whenever you use KubernetesExecutor or KubernetePodOperator. The PODs launched by KubernetesExecutor or KubernetePodOperator will use this mutation hook.

Now, back to the configmap. The case when you are using KubernetesExecutor and have a task that use KuberneretPodOperator, you need both airflow.cfg and airflow_local_settings.py file to exist on the worker pods launched by the KubernetesExecutor.

The KubernetesExecutor launches a Worker Pod for this task.

Scheduler Pod ---> Worker Pod (Pod_1 -- launched by KubernetesExecuetor) --> (Pod_2 -- launched by Pod_1 the task using KubernetePodOperator)

Now the entire [kubernetes] section in airflow.cfg (https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg#L870-L1028) is used only for KubernetesExecutor and affects what is mounted on the Worker Pods launched by KubernetesExecutor.

If you don't specify airflow_local_settings configmap, the airflow_local_settings file would not be mounted to the worker pod (Pod_1 in the above example) and only airflow.cfg file is mounted. So now for Pod_2 (launched by Pod_1) -- (special case when you use KubernetesPodOperator with KubernetesExecutor), since Pod_1 (the worker POD) doesn't have airflow_local_settings.py file even though the Scheduler has it, Pod_2 would not be mutated since the file doesn't exist over there.

Consider it same as airflow.cfg -- why do you mount airflow.cfg file both to the Scheduler POD and the worker POD. Similarly for this edge case you need airflow_local_settings.py file at both the places.

https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/kubernetes/worker_configuration.py#L279-L305 --> This code is used to decide what is mounted on the Worker Pod (REF_1)

https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/executors/kubernetes_executor.py#L462-L481 --> Pod created for each task run by KubernetesExecutor (REF_2) -- The mutation is applied to this POD as this is launched by the Scheduler and it has airflow_local_settings.py file

https://github.com/apache/airflow/blob/ba2d6408e64f219e8f53a20a5a149e3d8109db31/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L383 --> This code is used to create a new POD when using KubernetesPod Operator (REF_3) -- Since airflow_local_settings.py wasn't mounted on POD generated in REF_2 the mutations weren't applied to this POD.

2
Luis Magana On

I had the same issue, please ensure that airflow_local_settings can be imported from the scheduler. You will have to bake these changes into the images.

WORKDIR ${AIRFLOW_USER_HOME}
ENV PYTHONPATH  $PYTHONPATH:$AIRFLOW_HOME/config/
COPY airflow_local_settings.py $AIRFLOW_HOME/config/airflow_local_settings.py

Using the configmap you highlighted above will get them into the executors but at that point is not needed, so is kind of a useless setting. Feel free to read on the source code:

https://github.com/apache/airflow/blob/8465d66f05baeb73dd4479b019515c069444616e/airflow/settings.py

0
1220122 On

Are you setting "airflow_local_settings_configmap = airflow-configmap" in airflow.cfg field?