I have a problem. I'm using a Spark cluster via Spark Connect Server in Airflow. Everything is run through docker containers.
I have no problem preprocessing my data, or even showing the DataFrame in the logs with spark_df.show().
Buy when I want to use spark_df.count() in order to get the nrows of the data, I get this error:
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
I tried to change my spark-defaults.conf, but did not resolve the issue.
Here is the function I use to get DataFrame from a parquet:
def spark_read_parquet_to_psdf(filename: str, spark: SparkSession) -> ps.DataFrame:
spark_df = spark.read \
.option("header", "true") \
.parquet(f"/tmp/{filename}")
spark_df.show()
spark_df.cache()
df = spark_df.pandas_api()
return df
The only difference when I want to use a Spark DataFrame, is the last row that changes.
I also tried to run print(df) and get the same mistake.
Do you have any idea on what could be the problem?
Here are my Dockerfiles and docker-compose
FROM ubuntu:23.10
RUN apt update -y && \
apt upgrade -y
RUN apt install -y \
default-jdk \
wget \
ssh \
openssh-server
RUN wget https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz && \
tar xvf spark-3.5.0-bin-hadoop3.tgz && \
mv spark-3.5.0-bin-hadoop3 /opt/spark
ENV SPARK_HOME=/opt/spark
ENV PATH=$PATH:$SPARK_HOME/bin
ENV SPARK_CONF_DIR=/opt/spark/conf
RUN mkdir /tmp/spark-events
COPY spark_conf_files/init_hosts.sh /usr/local/bin/
COPY spark_conf_files/spark_jars/ /opt/spark/jars
COPY spark_conf_files/spark-defaults.conf /opt/spark/conf/
RUN chmod +x /usr/local/bin/init_hosts.sh
RUN chmod 777 /opt/spark/jars/sqljdbc42.jar
RUN chmod 777 /opt/spark/jars/pgjdbc_42.7.0.jar
ENTRYPOINT ["/usr/local/bin/init_hosts.sh"]
version: '3.8'
services:
spark-connect:
container_name: 'spark-connect'
build:
context: .
dockerfile: Dockerfile.spark
ports:
- "50131:4040"
- "50132:15002"
command: bash -c "
sleep 10 &&
/opt/spark/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.0 --master spark://spark-master:7077 &&
tail -f /dev/null"
tty: true
networks:
- default
environment:
- HADOOP_HOME=/etc/hadoop
- HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
- LD_LIBRARY_PATH=$HADOOP_HOME/lib/native:$LD_LIBRARY_PATH
volumes:
- spark-shared-volume:/tmp
spark-master:
container_name: 'spark-master'
build:
context: .
dockerfile: Dockerfile.spark
ports:
- "50110:8080"
- "50111:7077"
command: bash -c "bash /etc/init.d/ssh start &&
/opt/spark/sbin/start-master.sh &&
tail -f /dev/null"
tty: true
networks:
- default
environment:
- SPARK_MASTER_WEBUI_PORT=8080
volumes:
- spark-shared-volume:/tmp
spark-worker:
build:
context: .
dockerfile: Dockerfile.spark
command: bash -c "sleep 10 &&
/opt/spark/sbin/start-worker.sh spark://spark-master:7077 &&
tail -f /dev/null"
ports:
- "50121-50122:8081"
depends_on:
- spark-master
environment:
- SPARK_MASTER_URL=spark://spark-master:7077
networks:
- default
deploy:
mode: replicated
replicas: 2
volumes:
- spark-shared-volume:/tmp
volumes:
spark-shared-volume:
And here is my spark-defaults.conf
spark.executor.memory 8g
spark.executor.memoryOverhead 1g
spark.executor.cores 2
spark.sql.execution.arrow.pyspark.enabled true
spark.driver.cores 2
spark.driver.memory 8g
spark.network.timeout 600s
spark.files.fetchTimeout 600s
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.jars /opt/spark/jars/sqljdbc42.jar,/opt/spark/jars/pgjdbc_42.7.0.jar
EDIT: I realised that the error always shows when i run .collect()