Why is my build hanging / taking a long time to generate my query plan with many unions?

3.2k views Asked by At

I notice when I run the same code as my example over here but with a union or unionByName or unionAll instead of the join, my query planning takes significantly longer and can result in a driver OOM.

Code included here for reference, with a slight difference to what occurs inside the for() loop.

from pyspark.sql import types as T, functions as F, SparkSession
spark = SparkSession.builder.getOrCreate()

schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False),
  T.StructField("col_2", T.IntegerType(), False),
  T.StructField("measure_1", T.FloatType(), False),
  T.StructField("measure_2", T.FloatType(), False),
])
data = [
  {"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
  {"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]

df = spark.createDataFrame(data, schema)

right_schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False)
])
right_data = [
  {"col_1": 1},
  {"col_1": 1},
  {"col_1": 2},
  {"col_1": 2}
]
right_df = spark.createDataFrame(right_data, right_schema)

df = df.unionByName(df)
df = df.join(right_df, on="col_1")
df.show()

"""
+-----+-----+---------+---------+
|col_1|col_2|measure_1|measure_2|
+-----+-----+---------+---------+
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
+-----+-----+---------+---------+
"""

df.explain()

"""
== Physical Plan ==
*(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803]
+- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
   :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#5454]
   :     +- Union
   :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#5460]
         +- *(4) Scan ExistingRDD[col_1#1808]
"""

filter_union_cols = ["col_1", "measure_1", "col_2", "measure_2"]
df = df.withColumn("found_filter", F.lit(None))
for filter_col in filter_union_cols:
  stats = df.filter(F.col(filter_col) < F.lit(1)).drop("found_filter")
  df = df.unionByName(
    stats.select(
      "*",
      F.lit(filter_col).alias("found_filter")
    )
  )

df.show()

"""
+-----+-----+---------+---------+------------+                                  
|col_1|col_2|measure_1|measure_2|found_filter|
+-----+-----+---------+---------+------------+
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    1|    2|      0.5|      1.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    2|    3|      2.5|      3.5|        null|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
|    1|    2|      0.5|      1.5|   measure_1|
+-----+-----+---------+---------+------------+
"""

df.explain()

# REALLY long query plan.....

"""
== Physical Plan ==
Union
:- *(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, null AS found_filter#1855]
:  +- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7637]
:     :     +- Union
:     :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:           +- *(4) Scan ExistingRDD[col_1#1808]
:- *(12) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_1 AS found_filter#1860]
:  +- *(12) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(9) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7654]
:     :     +- Union
:     :        :- *(7) Filter (col_1#1800 < 1)
:     :        :  +- *(7) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(8) Filter (col_1#1800 < 1)
:     :           +- *(8) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(11) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:           +- *(10) Filter (col_1#1808 < 1)
:              +- *(10) Scan ExistingRDD[col_1#1808]
:- *(18) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_1 AS found_filter#1880]
:  +- *(18) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(15) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7671]
:     :     +- Union
:     :        :- *(13) Filter (measure_1#1802 < 1.0)
:     :        :  +- *(13) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(14) Filter (measure_1#1802 < 1.0)
:     :           +- *(14) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(17) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(24) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_1 AS found_filter#2022]
:  +- *(24) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(21) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7688]
:     :     +- Union
:     :        :- *(19) Filter ((col_1#1800 < 1) AND (measure_1#1802 < 1.0))
:     :        :  +- *(19) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(20) Filter ((col_1#1800 < 1) AND (measure_1#1802 < 1.0))
:     :           +- *(20) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(23) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(30) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#1900]
:  +- *(30) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(27) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7705]
:     :     +- Union
:     :        :- *(25) Filter (col_2#1801 < 1)
:     :        :  +- *(25) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(26) Filter (col_2#1801 < 1)
:     :           +- *(26) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(29) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(36) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2023]
:  +- *(36) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(33) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7722]
:     :     +- Union
:     :        :- *(31) Filter ((col_1#1800 < 1) AND (col_2#1801 < 1))
:     :        :  +- *(31) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(32) Filter ((col_1#1800 < 1) AND (col_2#1801 < 1))
:     :           +- *(32) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(35) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(42) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2024]
:  +- *(42) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(39) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7739]
:     :     +- Union
:     :        :- *(37) Filter ((measure_1#1802 < 1.0) AND (col_2#1801 < 1))
:     :        :  +- *(37) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(38) Filter ((measure_1#1802 < 1.0) AND (col_2#1801 < 1))
:     :           +- *(38) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(41) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(48) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, col_2 AS found_filter#2028]
:  +- *(48) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(45) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7756]
:     :     +- Union
:     :        :- *(43) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1))
:     :        :  +- *(43) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(44) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1))
:     :           +- *(44) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(47) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(54) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#1920]
:  +- *(54) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(51) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7773]
:     :     +- Union
:     :        :- *(49) Filter (measure_2#1803 < 1.0)
:     :        :  +- *(49) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(50) Filter (measure_2#1803 < 1.0)
:     :           +- *(50) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(53) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(60) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2025]
:  +- *(60) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(57) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7790]
:     :     +- Union
:     :        :- *(55) Filter ((col_1#1800 < 1) AND (measure_2#1803 < 1.0))
:     :        :  +- *(55) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(56) Filter ((col_1#1800 < 1) AND (measure_2#1803 < 1.0))
:     :           +- *(56) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(59) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(66) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2026]
:  +- *(66) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(63) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7807]
:     :     +- Union
:     :        :- *(61) Filter ((measure_1#1802 < 1.0) AND (measure_2#1803 < 1.0))
:     :        :  +- *(61) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(62) Filter ((measure_1#1802 < 1.0) AND (measure_2#1803 < 1.0))
:     :           +- *(62) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(65) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(72) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2029]
:  +- *(72) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(69) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7824]
:     :     +- Union
:     :        :- *(67) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(67) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(68) Filter (((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (measure_2#1803 < 1.0))
:     :           +- *(68) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(71) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(78) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2027]
:  +- *(78) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(75) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7841]
:     :     +- Union
:     :        :- *(73) Filter ((col_2#1801 < 1) AND (measure_2#1803 < 1.0))
:     :        :  +- *(73) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(74) Filter ((col_2#1801 < 1) AND (measure_2#1803 < 1.0))
:     :           +- *(74) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(77) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
:- *(84) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2030]
:  +- *(84) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(81) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7858]
:     :     +- Union
:     :        :- *(79) Filter (((col_1#1800 < 1) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(79) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(80) Filter (((col_1#1800 < 1) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :           +- *(80) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(83) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
:- *(90) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2031]
:  +- *(90) SortMergeJoin [col_1#1800], [col_1#1808], Inner
:     :- *(87) Sort [col_1#1800 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7875]
:     :     +- Union
:     :        :- *(85) Filter (((measure_1#1802 < 1.0) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :        :  +- *(85) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     :        +- *(86) Filter (((measure_1#1802 < 1.0) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
:     :           +- *(86) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
:     +- *(89) Sort [col_1#1808 ASC NULLS FIRST], false, 0
:        +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7643]
+- *(96) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803, measure_2 AS found_filter#2032]
   +- *(96) SortMergeJoin [col_1#1800], [col_1#1808], Inner
      :- *(93) Sort [col_1#1800 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#7892]
      :     +- Union
      :        :- *(91) Filter ((((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
      :        :  +- *(91) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
      :        +- *(92) Filter ((((col_1#1800 < 1) AND (measure_1#1802 < 1.0)) AND (col_2#1801 < 1)) AND (measure_2#1803 < 1.0))
      :           +- *(92) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
      +- *(95) Sort [col_1#1808 ASC NULLS FIRST], false, 0
         +- ReusedExchange [col_1#1808], Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#7660]
"""

I'm seeing a significantly longer query plan here, and especially as the number of iterations of the for() loop increases, the performance degrades terribly.

How can I improve my performance?

2

There are 2 answers

2
vanhooser On BEST ANSWER

This is a known limitation of iterative algorithms in Spark. At the moment, every iteration of the loop causes the inner nodes to be re-evaluated and stacked upon the outer df variable.

This means your query planning process is taking O(exp(n)) where n is the number of iterations of your loop.

There's a tool in Palantir Foundry called Transforms Verbs that can help with this.

Simply import transforms.verbs.dataframes.union_many and call it upon the total set of dataframes you wish to materialize (assuming your logic will allow for it, i.e. one iteration of the loop doesn't depend upon the result of a prior iteration of the loop.

The code above should instead be modified to:

from pyspark.sql import types as T, functions as F, SparkSession
from transforms.verbs.dataframes import union_many

spark = SparkSession.builder.getOrCreate()

schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False),
  T.StructField("col_2", T.IntegerType(), False),
  T.StructField("measure_1", T.FloatType(), False),
  T.StructField("measure_2", T.FloatType(), False),
])
data = [
  {"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
  {"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]

df = spark.createDataFrame(data, schema)

right_schema = T.StructType([
  T.StructField("col_1", T.IntegerType(), False)
])
right_data = [
  {"col_1": 1},
  {"col_1": 1},
  {"col_1": 2},
  {"col_1": 2}
]
right_df = spark.createDataFrame(right_data, right_schema)

df = df.unionByName(df)
df = df.join(right_df, on="col_1")
df.show()

"""
+-----+-----+---------+---------+
|col_1|col_2|measure_1|measure_2|
+-----+-----+---------+---------+
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    1|    2|      0.5|      1.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
|    2|    3|      2.5|      3.5|
+-----+-----+---------+---------+
"""

df.explain()

"""
== Physical Plan ==
*(6) Project [col_1#1800, col_2#1801, measure_1#1802, measure_2#1803]
+- *(6) SortMergeJoin [col_1#1800], [col_1#1808], Inner
   :- *(3) Sort [col_1#1800 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(col_1#1800, 200), ENSURE_REQUIREMENTS, [id=#5454]
   :     +- Union
   :        :- *(1) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   :        +- *(2) Scan ExistingRDD[col_1#1800,col_2#1801,measure_1#1802,measure_2#1803]
   +- *(5) Sort [col_1#1808 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col_1#1808, 200), ENSURE_REQUIREMENTS, [id=#5460]
         +- *(4) Scan ExistingRDD[col_1#1808]
"""

filter_union_cols = ["col_1", "measure_1", "col_2", "measure_2"]
df = df.withColumn("found_filter", F.lit(None))
union_dfs = []
for filter_col in filter_union_cols:
  stats = df.filter(F.col(filter_col) < F.lit(1)).drop("found_filter")
  union_df = stats.select(
    "*",
    F.lit(filter_col).alias("found_filter")
  )
  union_dfs += [union_df]

df = df.unionByName(
  union_many(union_dfs)
)

This will optimize your unions and take significantly less time.

The bottom line: beware of using any union calls inside for/while loops. If you must use this behavior, use the transforms.verbs.dataframes.union_many verb to optimize your final set of DataFrames

Check out your platform documentation for more information and more helpful Verbs.

Protip: Use the included optimization over here to further increase your performance

0
Misa On

If you receive an error about lists not having an unionByName/union_many attribute, while trying to implement op's solution, try to unpack the union_dfs in the last row with an asterisk prefix, like so:

df = df.unionByName(
    union_many(*union_dfs)
)

Apparently, Foundry does not unpack the list of dataframe references on its own.