The following code illustrates the problem..
import pyspark.sql.functions as F
import time
from datetime import datetime, timedelta
from functools import reduce
from operator import add
from pyspark.sql import SparkSession
master = "local"
executor_memory = "4g"
driver_memory = "4g"
spark = SparkSession.builder.config("spark.master", master)\
.config("spark.executor.memory", executor_memory)\
.config("spark.driver.memory", driver_memory)\
.getOrCreate()
NCOLS = 30
col_names = [f"{i}" for i in range(NCOLS)]
col_vals = [F.lit(f"{i}").alias(f"{i}") for i in range(NCOLS)]
df = spark.createDataFrame([('id',)], schema='id STRING')
df_data = df.select(["*"] + col_vals)
st=time.time()
df_final = df_data.withColumn("row_avg", reduce(add, [F.col(x) for x in col_names]) / NCOLS)
df_final.count()
print(f"processing time is {(time.time()-st)/60}")
This code takes about 4 mts on pyspark 3.4.0 and 3.4.1 (while it runs fast on 3.2.4 upto 3.3.3 and also on 3.5.0. Can anyone suggest if this is a defect or some change in behavior. Code works fast if NCOLS is smaller than 25. AS NCOLs increases beyond 25, processing time gets worser.
I have checked the behavior on pyspark 3.2.4, 3.3.3 and code works well, suggesting that there may some change in behavior in pyspark 3.4.0. Also tried rewriting using sum() and still see same behavior.
I can confirm that the script takes ~4 minutes with PySpark 3.4 but just a few seconds in PySpark 3.5
This looks like a bug.
I ran the script with PySpark 3.4. By looking at the logging messages with
it looks like it was the
CodeGenerator
that took 4 minutes (from 07:34:50 to 07:38:52)I increased the log level to "trace" to understand what was going on during that time and here is where 3 minutes get lost (from 07:44:34 to 07:47:43):