I have recently converted an enormous SAS datastep program to pyspark and I think the query is so large that the Catalyst optimizer causes an OOM error in the driver. I am able to run the query when I increase the driver memory to 256gb, but anything less and the job fails. This happens even when I run on a dataset with very few records.
This query takes a single input dataset, performs transformations on the input columns to generate a new set of columns. There are no joins, just 1000s of intermediate calcualtions to produce a final dataset with ~800 columns.
How can I structure such a large query so that spark can run it with fewer compute resources? I am being deliberatly vague, but the new columns I am producing essentially use F.when
and some array operations on columns created from F.split
.
My final action query looks like below and results in a logical plan that is enormous.
cols = [<list of 800 column expressions>]
df.select(*cols).write.parquet("<path/to/file>")
I have read plenty about checkpointing and how that truncates the logical plan. Does the plan need to be sent to the executors and it might be too big to fit in their memory? What is a best practice way to structure an enormous query? My first thought is to break it up into many smaller queries and then do a join at the end.