Databricks SQL Variables and if/else task in workflow

450 views Asked by At

I have 2 tasks in databricks job workflow first task is of type SQL and SQL task is query.In that query I've declared 2 variables and SET the values by running query.e.g:

DECLARE VARIABLE max_timestamp TIMESTAMP DEFAULT '1970-01-01' ;
SET VARIABLE max_timestamp = (select max(timestamp) from TableA );
select max_timestamp ;

Can I pass this max_timestamp variable to the next if/else task to check if it is null or is there any value in this variable and pass that value of max_timestamp to another task. or is there any workaround to do this? I want to use sql warehouse for this.

I created a sql task and assign max timestamp value from subquery to the sql variable and also created if/else task and in condition I've added {{tasks.[task_name].values.[value_name]}}, but the job throw below error on the if/else task.

run failed with error message

Failed to resolve references: Task value referenced by 'tasks.task_name.values.max_timestamp' was not found.

Query based dropdown list option in databricks sql editor will execute the query multiple times if I use the variable at multiple places in a big sql query ? If yes then it will be executing query to max_timestamp everytime I refer in the query.

1

There are 1 answers

0
JayashankarGS On

DECLARE is not supported in SQL warehouses; it is supported only in Databricks runtime 14.1 and above. Even though you use Databricks runtime 14.1 and above, you can not use the dynamic values {{tasks.[task_name].values.[value_name]}} because it is supported in Python notebooks and should be initialized using dbutils like below.

dbutils.jobs.taskValues.set(key = 'name', value = 'Some User')

So, you need to use a combination of SQL query type jobs and Python notebooks to check conditions in your case.

In your first task of type SQL, run your query and create a table using the results as given below.

Query

CREATE OR REPLACE TABLE maxTimestamp
AS SELECT MAX(customer_short_code) AS max_timestamp FROM table_name;

SELECT * FROM maxTimestamp;

This will create a table maxTimestamp and retrieve this result in the next task using a Python notebook and set task values.

Code in the notebook which creates task values.

ts = spark.sql("SELECT * FROM maxTimestamp").collect()[0][0]
print(ts)
dbutils.jobs.taskValues.set(key = 'max_timestamp', value = ts)

Make sure you create a table and retrieve table data in the same database.

Next, add an If/else task like you have previously. There you add this {{tasks.createTaskvalue.values.max_timestamp}} dynamic value for condition checking.

Below is the flow diagram.

enter image description here

Here, tst is a notebook used for setting task values, and next task2 is a condition check.