Executing Spark sql in delta live tables

49 views Asked by At

I am new to DLT and trying to get a hang of it. I have written the below code. I have two streaming tables (temp1 and temp2). I am creating two views out of those tables. I am then joining those views using a sql to create a view temp3. I am then populating the view temp3 to final_table.

@dlt.view
def temp1():
    return spark.readStream.table("catalog1.schema1.table1")
    
@dlt.view
def temp2():
    return spark.readStream.table("catalog1.schema1.table2.filter(col("field1") == "test"")
    
@dlt.view
def temp3():
    return spark.sql("select * from temp1 left join temp 2 on temp1.id = temp2.id")

@dlt.create_table(
         name="final_table",
       table_properties={
        "quality": "silver",
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact": "true",
        },
    )
def populate_final_table():
    final_df = dlt.read("temp3")
    return final_df

The error I am getting here is "failed to read temp3. Dataset is not defined in the pipeline." Could anyone please help me?

2

There are 2 answers

1
Vikas Sharma On

The SQL query in the temp3 view definition references temp 2 instead of temp2 (notice the space character).

Also, in temp2 view definition try to move the filter outside as follows:

@dlt.view
def temp2():
    return spark.readStream.table("catalog1.schema1.table2").filter(col("field1") == "test")

Apart from this, your code looks fine to me.

0
SreeVik On

I was able to get this code working by adding the prefix LIVE. to the query. So the query looks like this:

@dlt.view
def temp3():
    return spark.sql("select * from live.temp1 temp1 left join live.temp2 temp2 on temp1.id = temp2.id")

Referred to the documentation here.