I have the following code I wish to use for copying data arriving in a folder on a regular basis:
val streamingQuery = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.schema("`path` STRING, `modificationTime` TIMESTAMP, `length` BIGINT, `content` BINARY")
.option("recursiveFileLookup", "true")
.load("my path here")
.filter(col("modificationTime") > "2023-10-30 07:00:00")
.writeStream
.trigger(Trigger.AvailableNow())
.foreachBatch (my code goes here on how I copy files)
.option("checkpointLocation", "my path here")
.start()
.awaitTermination()
The question I have is that in my case what will be actually saved by the checkpoint? Everything that has been read from the folder or only those files, that are filtered by modification time? The reason I am asking is because I would like spark to evade trying to read all of the files in the folder - which is why I would like to have a checkpoint - however I would not want to copy all files before modification date as they are not needed.
With this approach, it will scan all files anyway, and Spark will do filtering. It's better to use the
modifiedAfter
option of Databricks Autoloader (doc) to specify the cutoff timestamp - then filtering will be done during file scanning.P.S. Another thing that can help with performance is to use the file notification mode instead of the default directory listing mode. Still, it may require additional permissions on the storage account.