with two delta tables(tableA, tableB) as input to the streaming pipeline I want to achieve the following:
- Processing starts when new rows in tableA appear(and not when tableB is updated)
- mergedTable = tableA.join(tableB, ...., "inner")
- do some transformations on mergedTable
- Based on the transformations append new rows to tableB
I started with the following:
tableA = spark.readstream.format("delta").load(path_to_tableA)
tableB = spark.readstream.format("delta").load(path_to_tableB)
mergedTable = tableA.join(tableB, ...., "inner")
def process_microbatch(df, batch_id):
...transformations on df...
df.write.mode("append").saveAsTable(path_to_tableB)
mergedTable.writeStream.foreachBatch(process_microbatch).start()
How can I make sure, that only the updates of tableA are triggering microbatch processings? It would be of course also important, that the new rows of tableB are recognized in point 2 within the next batch.
If
tableBis loaded only once at the beginning of the stream and not updated thereafter, any changes made to it within a microbatch will not be reflected in subsequent microbatches. To address this, you need to ensure thattableBis reloaded in each microbatch so that it includes the updates made in the previous microbatch.So, reload
tableBwithin theprocess_microbatchfunction.Here is the code.
By reloading
tableBwithin each microbatch, you ensure that any changes made to it in previous microbatches are considered in subsequent microbatches.