Does having large number of parquet files causes memory overhead while reading using Spark?

54 views Asked by At

I have a Spark job and every time I do a historical fix, my job fails because of memory limit exceed. While doing historical fix, I create a table in my temporary database and read it (for the first time) from there.

I am unable to figure out exactly what's the reason. The only difference I can see is that the number of parquet files are 10 times higher during the historical fix run than the normal Spark job run.

Historical Fix Table Stats

Normal Table Stats

When I am checking Spark history server, I find the following logs in stderr

24/03/26 16:12:23 INFO compress.CodecPool: Got brand-new compressor [.snappy]
24/03/26 16:12:24 INFO datasources.FileScanRDD: Reading File path: hdfs://namenode/warehouse/tablespace/external/hive/tempdb.db/some_table/eb4ce3cb9d5fa3a0-cffd3b5a00000004_650415028_data.17.parq, range: 0-125039486, partition values: [empty row]
24/03/26 16:12:24 INFO compat.FilterCompat: Filtering using predicate: noteq(scd_end, null)
24/03/26 16:12:24 INFO compat.FilterCompat: Filtering using predicate: noteq(scd_end, null)
24/03/26 16:12:24 INFO compat.FilterCompat: Filtering using predicate: noteq(scd_end, null)
24/03/26 16:12:40 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 510
24/03/26 16:12:40 INFO executor.Executor: Running task 5.2 in stage 20.0 (TID 510)
24/03/26 16:12:40 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
24/03/26 16:12:40 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
24/03/26 16:12:40 INFO datasources.FileScanRDD: Reading File path: hdfs://namenode/warehouse/tablespace/external/hive/tempdb.db/some_table/eb4ce3cb9d5fa3a0-cffd3b5a00000000_1522645193_data.10.parq, range: 0-131168664, partition values: [empty row]
24/03/26 16:12:40 INFO compat.FilterCompat: Filtering using predicate: noteq(scd_end, null)
24/03/26 16:12:40 INFO compat.FilterCompat: Filtering using predicate: noteq(scd_end, null)
24/03/26 16:12:40 INFO compat.FilterCompat: Filtering using predicate: noteq(scd_end, null)
24/03/26 16:12:41 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
24/03/26 16:12:46 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

When I see the execution flow difference, I can see that in the normal run, it is able to scan the parquet file and show the number of rows fetched while it is not the case in the historical fix run.

Historical Fix Run

Normal Table Run

I want to understand if I am thinking in the right direction. Based on the above logs and the DAG visualisation to me it seems that huge number of files is causing overhead for Spark and throwing memory overlimit.

Is my understanding correct or am I completely thinking and debugging in the wrong direction. Tips are greatly welcomed for debugging memory related issues in Spark (as I am fairly new to Spark)

0

There are 0 answers