Loading Data to Gold table of DLT pipeline from silver table

320 views Asked by At

Background: I have created a DLT pipeline with bronze, silver and Gold table. Bronze and Silver are streaming tables and Gold is T&L table. The pipeline is running fine with incremental data.

Issue: Recently we had a requirement of loading history data. On which I read the data manually from the source using spark.read() and write it to silver table. On checking the version of the silver table it looks good and also I can query the data in silver table. Now the issue is on reading the data in gold layer. When Gold table is reading the data from silver table its only reading the incremental data and not the history data that I have loaded manually.

Screenshot on how I have loaded the data to Silver table: Loading of history data

Code:

from pyspark.sql.functions import lit

#Reading History data from Source
f_cat = spark.read.json('<source_path>')
df_cat.createOrReplaceTempView("catalog")

#Selecting required columns from source as per silver table with some required conditions

df = spark.sql('''select contactKey,optIn,v,date,practiceName, address,city,state,zip,country,contactName, contactPhone,contactEmail,contactFirstName, contactLastName,legalRepresentativeName, to_timestamp("null") as S3LoadDate, to_timestamp("null") as LambdaLoadDate, to_timestamp("null") as EventHubLoadDate, to_timestamp(current_timestamp(),"MM-dd-yyyy HH mm ss SSS") as RawLoadDate , to_timestamp(current_timestamp(), "MM-dd-yyyy HH mm ss SSS") as RefinedLoadDate, to_date(current_timestamp(),"yyyy-MM-dd") as ProcessDate from catalog''')

#Writing data to Silver table
df_cons.write.format("delta").mode("append").insertInto("db_device.Refined_Catalog")

Screenshot of the DLT Piepline: DLT Piepline

Expected Output: On executing the DLT pipeline after loading the history data in silver layer, Gold table should read the data from silver layer.

0

There are 0 answers