Airflow API call returning 400 when clearing and rerunning a task

365 views Asked by At

I am setting up an Airflow environment which runs few databricks notebooks. I have setup a create_cluster task using PythonOperator which creates a cluster, Provide permission and install required libraries all using Databricks REST API 2.0.

I am facing a problem in the API call for installing libraries. When i am triggering a DAG run, everything works fine. At the same time, if i am clearing the create_cluster task, Library API is returning 400 Response. I am not even sure why this is happening. PFB the api call i am making:

response_lib = s.post('https://%s/api/2.0/libraries/install' % (DOMAIN),
        headers={'Authorization': 'Bearer %s' % TOKEN},
        json={
            "cluster_id": clusterid,
            "libraries": [
                {
                    "jar": "dbfs:/FileStore/jars/4092ccd0_a657_4de2_865a_6a413580bbcd-ojdbc8.jar"
                },
                {
                    "jar": "dbfs:/FileStore/jars/d4c36be6_f697_443e_84fd_179ce07e510a-fc98ca3a_e4c2_48de_8a76_63153afe6588_spark_salesforce_assembly_1_1_3_PR46_maxCharsPerColumn_c6b42-231af.jar"
                },
                {
                    "jar": "dbfs:/FileStore/jars/2e72bd30_5861_4f2f_ba94_b591b3c604b0-jars_a68bd7d7_5e75_493c_8720_c70ff3c1f58e_RedshiftJDBC42_1_2_12_1017_fefdf-6e9da.jar"
                },
                {
                    "maven": {
                        "coordinates": "com.springml:spark-salesforce_2.11:1.1.3"
                        }
                },
                {
                    "pypi": {
                        "package": "cx_Oracle"
                        }
                },
                {
                    "pypi": {
                        "package": "numpy==1.16.1"
                        }
                },
                {
                    "pypi": {
                        "package": "tabulate"
                        }
                },
                {
                    "pypi": {
                        "package": "pysftp"
                        }
                },
                {
                    "pypi": {
                        "package": "s3fs"
                        }
                },
                {
                    "pypi": {
                        "package": "regex"
                        }
                }
    
                ]
            }
        )
        
        if response_lib.status_code==200:
            print('Library installation response',response_lib)
        else:
            print("Libraries could not be installed")
            print(response_lib)
        

This is output of print(response_lib) in case of retry of task: <Response [400]>

I am using Airflow managed by AWS. Can someone please help me understand why this is happening. Databricks official documentation on Libraries API 2.0

1

There are 1 answers

0
Vaishak On BEST ANSWER

Resolved this. Before calling the install API, provided a delay of 10 Seconds. This seems to solve the issue.

Code:

Event().wait(10)
response_lib = s.post('https://%s/api/2.0/libraries/install' % (DOMAIN),
        headers={'Authorization': 'Bearer %s' % TOKEN},
...........