I notice when I run the same code as my example over here but with a union
or unionByName
or unionAll
instead of the join
, my query planning takes significantly longer and can result in a driver OOM.
Code included here for reference, with a slight difference to what occurs inside the for()
loop.
from pyspark.sql import types as T, functions as F, SparkSession
spark = SparkSession.builder.getOrCreate()
schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("measure_1", T.FloatType(), False),
T.StructField("measure_2", T.FloatType(), False),
])
data = [
{"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
{"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]
df = spark.createDataFrame(data, schema)
right_schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False)
])
right_data = [
{"col_1": 1},
{"col_1": 1},
{"col_1": 2},
{"col_1": 2}
]
right_df = spark.createDataFrame(right_data, right_schema)
df = df.unionByName(df)
df = df.join(right_df, on="col_1")
df.show()
"""
+-----+-----+---------+---------+
|col_1|col_2|measure_1|measure_2|
+-----+-----+---------+---------+
| 1| 2| 0.5| 1.5|
| 1| 2| 0.5| 1.5|
| 1| 2| 0.5| 1.5|
| 1| 2| 0.5| 1.5|
| 2| 3| 2.5| 3.5|
| 2| 3| 2.5| 3.5|
| 2| 3| 2.5| 3.5|
| 2| 3| 2.5| 3.5|
+-----+-----+---------+---------+
"""
df.explain()
"""
== Physical Plan ==
*(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803]
+- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#5454]
: +- Union
: :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
+- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#5460]
+- *(4) Scan ExistingRDD[col_1#1808]
"""
filter_union_cols = ["col_1", "measure_1", "col_2", "measure_2"]
df = df.withColumn("found_filter", F.lit(None))
for filter_col in filter_union_cols:
stats = df.filter(F.col(filter_col) < F.lit(1)).drop("found_filter")
df = df.unionByName(
stats.select(
"*",
F.lit(filter_col).alias("found_filter")
)
)
df.show()
"""
+-----+-----+---------+---------+------------+
|col_1|col_2|measure_1|measure_2|found_filter|
+-----+-----+---------+---------+------------+
| 1| 2| 0.5| 1.5| null|
| 1| 2| 0.5| 1.5| null|
| 1| 2| 0.5| 1.5| null|
| 1| 2| 0.5| 1.5| null|
| 2| 3| 2.5| 3.5| null|
| 2| 3| 2.5| 3.5| null|
| 2| 3| 2.5| 3.5| null|
| 2| 3| 2.5| 3.5| null|
| 1| 2| 0.5| 1.5| measure_1|
| 1| 2| 0.5| 1.5| measure_1|
| 1| 2| 0.5| 1.5| measure_1|
| 1| 2| 0.5| 1.5| measure_1|
+-----+-----+---------+---------+------------+
"""
df.explain()
# REALLY long query plan.....
"""
== Physical Plan ==
Union
:- *(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, null AS found_filter#1855]
: +- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7637]
: : +- Union
: : :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
: +- *(4) Scan ExistingRDD[col_1#1808]
:- *(12) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_1 AS found_filter#1860]
: +- *(12) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(9) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7654]
: : +- Union
: : :- *(7) Filter (col_1#1800 < 1)
: : : +- *(7) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(8) Filter (col_1#1800 < 1)
: : +- *(8) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(11) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
: +- *(10) Filter (col_1#1808 < 1)
: +- *(10) Scan ExistingRDD[col_1#1808]
:- *(18) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_1 AS found_filter#1880]
: +- *(18) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(15) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7671]
: : +- Union
: : :- *(13) Filter (measure_1#1802 < 1.0)
: : : +- *(13) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(14) Filter (measure_1#1802 < 1.0)
: : +- *(14) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(17) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(24) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_1 AS found_filter#2022]
: +- *(24) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(21) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7688]
: : +- Union
: : :- *(19) Filter ((col_1#1800 < 1) AND (measure_1#1802 < 1.0))
: : : +- *(19) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(20) Filter ((col_1#1800 < 1) AND (measure_1#1802 < 1.0))
: : +- *(20) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(23) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(30) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#1900]
: +- *(30) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(27) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7705]
: : +- Union
: : :- *(25) Filter (col_2#1801 < 1)
: : : +- *(25) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(26) Filter (col_2#1801 < 1)
: : +- *(26) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(29) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(36) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2023]
: +- *(36) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(33) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7722]
: : +- Union
: : :- *(31) Filter ((col_1#1800 < 1) AND (col_2#1801 < 1))
: : : +- *(31) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(32) Filter ((col_1#1800 < 1) AND (col_2#1801 < 1))
: : +- *(32) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(35) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(42) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2024]
: +- *(42) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(39) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7739]
: : +- Union
: : :- *(37) Filter ((measure_1#1802 < 1.0) AND (col_2#1801 < 1))
: : : +- *(37) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(38) Filter ((measure_1#1802 < 1.0) AND (col_2#1801 < 1))
: : +- *(38) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(41) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(48) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2028]
: +- *(48) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(45) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7756]
: : +- Union
: : :- *(43) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1))
: : : +- *(43) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(44) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1))
: : +- *(44) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(47) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(54) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#1920]
: +- *(54) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(51) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7773]
: : +- Union
: : :- *(49) Filter (measure_2#1803 < 1.0)
: : : +- *(49) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(50) Filter (measure_2#1803 < 1.0)
: : +- *(50) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(53) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(60) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2025]
: +- *(60) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(57) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7790]
: : +- Union
: : :- *(55) Filter ((col_1#1800 < 1) AND (measure_2#1803 < 1.0))
: : : +- *(55) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(56) Filter ((col_1#1800 < 1) AND (measure_2#1803 < 1.0))
: : +- *(56) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(59) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(66) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2026]
: +- *(66) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(63) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7807]
: : +- Union
: : :- *(61) Filter ((measure_1#1802 < 1.0) AND (measure_2#1803 < 1.0))
: : : +- *(61) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(62) Filter ((measure_1#1802 < 1.0) AND (measure_2#1803 < 1.0))
: : +- *(62) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(65) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(72) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2029]
: +- *(72) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(69) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7824]
: : +- Union
: : :- *(67) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (measure_2#1803 < 1.0))
: : : +- *(67) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(68) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (measure_2#1803 < 1.0))
: : +- *(68) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(71) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(78) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2027]
: +- *(78) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(75) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7841]
: : +- Union
: : :- *(73) Filter ((col_2#1801 < 1) AND (measure_2#1803 < 1.0))
: : : +- *(73) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(74) Filter ((col_2#1801 < 1) AND (measure_2#1803 < 1.0))
: : +- *(74) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(77) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(84) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2030]
: +- *(84) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(81) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7858]
: : +- Union
: : :- *(79) Filter (((col_1#1800 < 1) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
: : : +- *(79) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(80) Filter (((col_1#1800 < 1) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
: : +- *(80) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(83) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(90) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2031]
: +- *(90) SortMergeJoin [col_1#1800], [col_1#1808], Inner
: :- *(87) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7875]
: : +- Union
: : :- *(85) Filter (((measure_1#1802 < 1.0) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
: : : +- *(85) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: : +- *(86) Filter (((measure_1#1802 < 1.0) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
: : +- *(86) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(89) Sort [col_1#1808 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
+- *(96) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2032]
+- *(96) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:- *(93) Sort [col_1#1800 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7892]
: +- Union
: :- *(91) Filter ((((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
: : +- *(91) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
: +- *(92) Filter ((((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
: +- *(92) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
+- *(95) Sort [col_1#1808 ASC NULLS FIRST], false, 0
+- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
"""
I'm seeing a significantly longer query plan here, and especially as the number of iterations of the for()
loop increases, the performance degrades terribly.
How can I improve my performance?
This is a known limitation of iterative algorithms in Spark. At the moment, every iteration of the loop causes the inner nodes to be re-evaluated and stacked upon the outer
df
variable.This means your query planning process is taking
O(exp(n))
where n is the number of iterations of your loop.There's a tool in Palantir Foundry called Transforms Verbs that can help with this.
Simply import
transforms.verbs.dataframes.union_many
and call it upon the total set of dataframes you wish to materialize (assuming your logic will allow for it, i.e. one iteration of the loop doesn't depend upon the result of a prior iteration of the loop.The code above should instead be modified to:
This will optimize your unions and take significantly less time.
The bottom line: beware of using any
union
calls inside for/while loops. If you must use this behavior, use thetransforms.verbs.dataframes.union_many
verb to optimize your final set of DataFramesCheck out your platform documentation for more information and more helpful Verbs.
Protip: Use the included optimization over here to further increase your performance