Spark optimize "DataFrame.explain" / Catalyst

650 views Asked by At

I've got a complex software which performs really complex SQL queries (well not queries, Spark plans you know). <-- The plans are dynamic, they change based on user input so I can't "cache" them.

I've got a phase in which spark takes 1.5-2min building the plan. Just to make sure, I added "logXXX", then explain(true), then "logYYY" and it takes 1minute 20 seconds for the explain to execute.

I've trying breaking the lineage but this seems to cause worse performance because the actual execution time becomes longer.

I can't parallelize driver work (already did, but this task can't be overlapped with anything else).

Any ideas/guide on how to improve the plan builder in Spark? (like for example, flags to try enabling/disabling and such...)

Is there a way to cache plans in Spark? (so I can run that in parallel and then execute it)

I've tried disabling all possible optimizer rules, setting min iterations to 30... but nothing seems to affect that concrete point :S

I tried disabling wholeStageCodegen and it helped a little, but the execution is longer so :).

Thanks!,

PS: The plan does contain multiple unions (<20, but quite complex plans inside each union) which are the cause for the time, but splitting them apart also affects execution time.

1

There are 1 answers

5
BiS On

Just in case it helps someone (and if no-one provides more insights).

As I couldn't manage to reduce optimizer times (and well, not sure if reducing optimizer times would be good, as I may lose execution time).

One of the latest parts of my plan was scanning two big tables and getting one column from each one of them (using windows, aggregations etc...).

So I splitted my code in two parts:

1- The big plan (cached)
2- The small plan which scans and aggregates two big tables (cached)

And added one more part:

3- Left Join/enrich the big plan with the output of "2" (this takes like 10seconds, the dataset is not so big) and finish the remainder computation.

Now I launch both actions (1,2) in parallel (using driver-level parallelism/threads), cache the resulting DataFrames and then wait+ afterwards perform 3.

With this, while Spark driver (thread 1) is calculating the big plan (~2minutes) the executors will be executing part "2" (which has a small plan, but big scans/shuffles) and then both get "mixed" in like 10-15seconds, which a good improvement in execution time over the 1:30 I save while calculating the plan.

Comparing times:

Before I would have

1:30 Spark optimizing time + 6 minutes execution time

Now I have

max
(
1:30 Spark Optimizing time + 4 minutes execution time, 
0:02 Spark Optimizing time + 2 minutes execution time
) 
+ 15 seconds joining both parts

Not so much, but quite a few "expensive" people will be waiting for it to finish :)