I am new to DLT and trying to get a hang of it. I have written the below code. I have two streaming tables (temp1 and temp2). I am creating two views out of those tables. I am then joining those views using a sql to create a view temp3. I am then populating the view temp3 to final_table.
@dlt.view
def temp1():
return spark.readStream.table("catalog1.schema1.table1")
@dlt.view
def temp2():
return spark.readStream.table("catalog1.schema1.table2.filter(col("field1") == "test"")
@dlt.view
def temp3():
return spark.sql("select * from temp1 left join temp 2 on temp1.id = temp2.id")
@dlt.create_table(
name="final_table",
table_properties={
"quality": "silver",
"delta.autoOptimize.optimizeWrite": "true",
"delta.autoOptimize.autoCompact": "true",
},
)
def populate_final_table():
final_df = dlt.read("temp3")
return final_df
The error I am getting here is "failed to read temp3. Dataset is not defined in the pipeline." Could anyone please help me?
The SQL query in the
temp3view definition referencestemp 2instead oftemp2(notice the space character).Also, in
temp2view definition try to move the filter outside as follows:Apart from this, your code looks fine to me.