How to list all connections clearly within composer/airflow?

1.2k views Asked by At

I am trying to create a composer environment with the approach of infrastructure as code. For this, I need to store and retrieve airflow variables programmatically and keep them versioned somewhere.

In a previous post, Ed Morton wrote a script to convert the table to a JSON, but there is a problem related to the way composer/airflow outputs the data when using the following command :

gcloud composer environments run `$COMPOSER_ENV` --location <location> connections -- --list

A sample of the output is :

╒════════════════════════════════╤═════════════════════════════╤════════════════════════════════╤════════╤════════════════╤══════════════════════╤════════════════════════════════╕
│ Conn Id                        │ Conn Type                   │ Host                           │ Port   │ Is Encrypted   │ Is Extra Encrypted   │ Extra                          │
╞════════════════════════════════╪═════════════════════════════╪════════════════════════════════╪════════╪════════════════╪══════════════════════╪════════════════════════════════╡
│ 'airflow_db'                   │ 'mysql'                     │ 'airflow-sqlp...rvice.default' │ None   │ True           │ False                │ None                           │
├────────────────────────────────┼─────────────────────────────┼────────────────────────────────┼────────┼────────────────┼──────────────────────┼────────────────────────────────┤

As you can see, the problem is the Host, and Extra columns contain an ellipsis ... that abridges long text such as here 'airflow-sqlp...rvice.default'.

How do I get a full version of the information output by the above-mentioned (composer) utility?

I am using composer-1.12.1-airflow-1.10.9. Unfortunately the nice feature of exporting the connections to a JSON using the CLI is only available in the latest version of airflow.

3

There are 3 answers

2
arunvelsriram On

Am working on Airflow but never used composer. However, from the documentation came to know that gcloud composer environments run runs Airflow CLI sub-commands remotely.

Airflow CLI has an option to open DB shell airflow shell and its capable of receiving input from stdin. So I tried to pipe in SQL statement to retrieve connections and it worked.

> echo "select * from connection limit 3;" | airflow shell
/usr/local/Caskroom/miniconda/base/envs/airflow-demo/lib/python3.7/site-packages/airflow/configuration.py:761: DeprecationWarning: You have two airflow.cfg files: /Users/arunvelsriram/airflow/airflow.cfg and /Users/arunvelsriram/spikes/airflow/airflow-demo/airflow_home/airflow.cfg. Airflow used to look at ~/airflow/airflow.cfg, even when AIRFLOW_HOME was set to a different value. Airflow will now only read /Users/arunvelsriram/spikes/airflow/airflow-demo/airflow_home/airflow.cfg, and you should remove the other file
  category=DeprecationWarning,
DB: sqlite:///airflow_home/airflow.db
1|airflow_db|mysql|mysql|airflow|root||||0|0
2|beeline_default|beeline|localhost|default|||10000|{"use_beeline": true, "auth": ""}|0|0
3|bigquery_default|google_cloud_platform||default|||||0|0

We could also extract the results as json or csv. Most databases supports it. For example in sqlite:

> echo "select
json_group_array(
        json_object(
        'id', id,
        'conn_id', conn_id,
        'conn_type', conn_type,
        'host', host, 'schema', schema,
        'login', login,
        'password', password,
        'port', port,
        'extra', extra,
        'is_encrypted', is_encrypted,
        'is_extra_encrypted', is_extra_encrypted
    )
) as json_result
from (select * from connection limit 3);" | airflow shell
/usr/local/Caskroom/miniconda/base/envs/airflow-demo/lib/python3.7/site-packages/airflow/configuration.py:761: DeprecationWarning: You have two airflow.cfg files: /Users/arunvelsriram/airflow/airflow.cfg and /Users/arunvelsriram/spikes/airflow/airflow-demo/airflow_home/airflow.cfg. Airflow used to look at ~/airflow/airflow.cfg, even when AIRFLOW_HOME was set to a different value. Airflow will now only read /Users/arunvelsriram/spikes/airflow/airflow-demo/airflow_home/airflow.cfg, and you should remove the other file
  category=DeprecationWarning,
DB: sqlite:///airflow_home/airflow.db
[{"id":1,"conn_id":"airflow_db","conn_type":"mysql","host":"mysql","schema":"airflow","login":"root","password":null,"port":null,"extra":null,"is_encrypted":0,"is_extra_encrypted":0},{"id":2,"conn_id":"beeline_default","conn_type":"beeline","host":"localhost","schema":"default","login":null,"password":null,"port":10000,"extra":"{\"use_beeline\": true, \"auth\": \"\"}","is_encrypted":0,"is_extra_encrypted":0},{"id":3,"conn_id":"bigquery_default","conn_type":"google_cloud_platform","host":null,"schema":"default","login":null,"password":null,"port":null,"extra":null,"is_encrypted":0,"is_extra_encrypted":0}]

I couldn't try it composer as I don't have a composer environment. This is just a trick that I could think of since current version of Airflow CLI doesn't have configurable output.

0
milia On

Adding to the excellent solution of @savsr, here are the steps before his solution. You can run the following commands in Cloud Shell.

According to the Google Cloud Platform docs:

You can get all the pods by performing the following command:

kubectl get pods --all-namespaces

Next, look for a pod with a name like airflow-worker-1a2b3c-x0yz.

Let's assume that this pod has namespace composer-1-6-0-airflow-example-namespace. Then you'll connect to it and get into a bash shell by performing the following command:

kubectl -n composer-1-6-0-airflow-example-namespace \ 
  exec -it airflow-worker-1a2b3c-x0yz -c airflow-worker -- /bin/bash

Now that you are inside this specific airflow worker, in a bash shell, you can perform the commands that @savsr has mentioned.

This I tried and it worked for me:

echo "select * from connection limit 3;" | airflow shell

Also, you may perform a more limiting query that will give you exactly the host names that you want:

echo "select host from connection limit 3;" | airflow shell

which in my example yielded:

host
airflow-sqlproxy-service.default
localhost
NULL
2
kk1957 On
  1. Connect to correct pod
gcloud container clusters get-credentials PROJECT_PATH  --zone ZONE

PROJECT_PATH is something like projects/..../..gke : this is the value of GKE cluster field in the Environment detail section

kubectl get pods --all-namespaces
 
  1. From the NAMESPACE column take the name of a composer-* row and from the NAME column take the name of an airflow-worker-* row. Then connect to the worker as follows:
kubectl exec -itn composer-1-10-0-airflow-1-10-6-5983e0fe airflow-worker-8d8c49c87-9v7c4 -- /bin/bash
  1. Once inside the worker, connect to database shell
airflow shell

NOTE: If shell command is not available, use the instruction on https://cloud.google.com/composer/docs/access-airflow-database#airflow-1 to access the database. And then run the SQL

select   json_arrayagg(
            json_object(
             'id', id,
             'conn_id', conn_id,
             'conn_type', conn_type,
             'host', host,
             'login',login,
             'password',password,
             'port',port,
             'extra', extra,
             'is_encrypted', is_encrypted,
             'is_extra_encrypted', is_extra_encrypted,
             'schema',IFNULL(`schema`, null)
           )
        ) as json_result
from (select * from connection  ) x;