Is it possible to use an operator within a function that is returning an operator, in Airflow?

63 views Asked by At

I am currently writing/creating an external table in databricks.

To do so, I am using the DatabricksSqlOperator.

I have a function that returns this Dbks Operator with all the needed details in it.

@dag(...)
.
.
.
def create_external():
    return DatabricksSqlOperator(...)

However, to improve the quality of the code, I want to query the schema table in dbks before creating the external table (to check for schema changes).

I would like to know if it would be possible to use an operator inside the same function, or if it has to be an extra "task" in my DAG. Since its an intermediate task, it will be very ugly to have everywhere.

@dag(...)
.
.
.
def create_external():

    query_result = DatabricksSqlOperator(...)
    if query_result:
        queries= [query1, query2]
    else:
        queries= [query1]

    return DatabricksSqlOperator(query=queries)
1

There are 1 answers

4
Kombajn zbożowy On BEST ANSWER

It needs to be a separate task plus a branch operator that are aligned in a sequence like this:

query1 >> test_query >> branch >> query2

If it is a repeating pattern you can wrap it in a task group:

def create_external():
    def decide(**kwargs):
        result = kwargs['ti'].xcom_pull(task_ids='test')
        return 'query2' if result == "xxx" else None  # adjust the test

    @task_group()
    def databricks_group():
        query1 = DatabricksSqlOperator(...)
        test_query = DatabricksSqlOperator(...)
        branch = BranchPythonOperator(
            task_id='branch',
            provide_context=True,
            python_callable=decide
        )
        query2 = DatabricksSqlOperator(...)

    return databricks_group()

Notes:

  • Beware of trigger_rule traps in pipelines containing branches (read here)
  • If you look into BranchSqlOperator and DatabricksSqlOperator you might be able to define something like BranchDatabricksSqlOperator that would allow to have test_query and branch as a single task