I hope everyone is doing well. I am working to create a stream-stream join Spark app and I have a few doubts about it. My sources of data is file
kind and the output is to write into an Iceberg Table.
So, the idea is left join information of entries of files the comes in these two prefixes.
My approach was to create two temporary views pointing to the source, each one adding their respectively watermark and then to the streaming query execute a SQL to the streaming query object.
And for the write stream, I have the following options:
streamingQuery
.writeStream
.option("checkpointLocation", "path_in_s3")
.trigger(processingTime="60 seconds")
.foreachBatch(custom_writer_func)
.start()
.awaitTermination()
For the custom_writer_func
, it only does a broadcast of a dataset used in a udf and then write to the iceberg table with append
mode.
Given that, I am having two main problems:
1 - For some reason that I don't understand, the join isn't writing any data, all commits to the iceberg table are appending 0 new files. But as a left join, it should write at least from the left side.
2 - I am testing in a staging environment where there are finite files in the source prefixes, and the archive
is on, there are some files that Spark is trying to move the source files two times.
Error ref:
23/10/17 14:53:12 WARN FileStreamSource$SourceFileArchiver: Fail to move s3://test/source_path_1/year=2023/month=10/day=10/hour=00/file_table_1 to s3://test/archive/source_path_1/year=2023/month=10/day=10/hour=00/file_table_1 / skip moving file.
java.io.FileNotFoundException: No such file or directory: s3://test/source_path_1/year=2023/month=10/day=10/hour=00/file_table_1
FTR I am using Spark on Kubernetes, iceberg version 0.12.1 and Apache Spark 3.3.0.
Could anyone help me understand what is happening here? If any further information is needed, I will be thrilled to provide.
Thanks!