pyspark 3.4.0 row-wise reduction taking too long

86 views Asked by At

The following code illustrates the problem..

import pyspark.sql.functions as F
import time
from datetime import datetime, timedelta
from functools import reduce
from operator import add
from pyspark.sql import SparkSession

master = "local"
executor_memory = "4g"
driver_memory = "4g"

spark = SparkSession.builder.config("spark.master", master)\
    .config("spark.executor.memory", executor_memory)\
    .config("spark.driver.memory", driver_memory)\
    .getOrCreate()

NCOLS = 30

col_names = [f"{i}" for i in range(NCOLS)]
col_vals = [F.lit(f"{i}").alias(f"{i}") for i in range(NCOLS)]
df = spark.createDataFrame([('id',)], schema='id STRING')
df_data = df.select(["*"] + col_vals)

st=time.time()
df_final = df_data.withColumn("row_avg", reduce(add, [F.col(x) for x in col_names]) / NCOLS)
df_final.count()
print(f"processing time is {(time.time()-st)/60}")

This code takes about 4 mts on pyspark 3.4.0 and 3.4.1 (while it runs fast on 3.2.4 upto 3.3.3 and also on 3.5.0. Can anyone suggest if this is a defect or some change in behavior. Code works fast if NCOLS is smaller than 25. AS NCOLs increases beyond 25, processing time gets worser.

I have checked the behavior on pyspark 3.2.4, 3.3.3 and code works well, suggesting that there may some change in behavior in pyspark 3.4.0. Also tried rewriting using sum() and still see same behavior.

1

There are 1 answers

0
user2314737 On

I can confirm that the script takes ~4 minutes with PySpark 3.4 but just a few seconds in PySpark 3.5

This looks like a bug.

I ran the script with PySpark 3.4. By looking at the logging messages with

spark-submit yourScript.py

it looks like it was the CodeGenerator that took 4 minutes (from 07:34:50 to 07:38:52)

2023-10-18 07:34:50 INFO  ContextHandler:921 - Started o.s.j.s.ServletContextHandler@ec69003{/static/sql,null,AVAILABLE,@Spark}
2023-10-18 07:38:52 INFO  CodeGenerator:60 - Code generated in 767.824343 ms

I increased the log level to "trace" to understand what was going on during that time and here is where 3 minutes get lost (from 07:44:34 to 07:47:43):

2023-10-18 07:44:34 TRACE HeartbeatReceiver:68 - Checking for hosts with no recent heartbeats in HeartbeatReceiver. ��������������������������������������������������������������������������������������������

2023-10-18 07:47:43 TRACE BaseSessionStateBuilder$$anon$1:68 - Fixed point reached for batch UpdateNullability after 1 iterations.