My goal is to continuously put incoming parquet files into delta-lake, make queries, and get the results into a Rest API. All files are in s3 buckets.
//listen for changes
val df = spark.readStream().parquet("s3a://myBucket/folder")
//write changes to delta lake
df.writeStream()
.format("delta")
.option("checkpointLocation", "s3a://myBucket-processed/checkpoint")
.start("s3a://myBucket-processed/")
.awaitTermination() //this call lives in another thread (because it's blocking)
//this is a bad example
val query = df.select(convertedColumnNames)
query.show()
//another bad example:
spark.readStream().format("delta").load("s3a://myBucket-processed/").select(convertedColumnNames).show()
//org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
How can I get the filtered data out from delta lake?
Did you try using foreachBatch?
It brings all batch like features to streaming and you can also somewhat control number of files you are writing into delta lake.