What is the function of microBatchDF._jdf.sparkSession().sql in this code?
def upsert_to_delta(microBatchDF, batchId):
microBatchDF.createOrReplaceTempView("updates")
microBatchDF._jdf.sparkSession().sql("""
MERGE INTO silver s
USING updates u
ON s.mrn = u.mrn
WHEN MATCHED AND s.dob <> u.dob OR
s.sex <> u.sex OR
s.gender <> u.gender OR
s.first_name <> u.first_name OR
s.last_name <> u.last_name OR
s.street_address <> u.street_address OR
s.zip <> u.zip OR
s.city <> u.city OR
s.state <> u.state OR
s.updated <> u.updated
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
""")
query = (spark.readStream
.table("bronze")
.writeStream
.foreachBatch(upsert_to_delta)
.trigger(availableNow=True)
# .trigger(processingTime='5 seconds')
.start())
DA.block_until_stream_is_ready(query)
The above is an upsert into a sliver table using streaming read against the bronze table, matching on unique identifier mrn.