Spark error while using count() or pandasAPI len(df)

73 views Asked by At

I have a problem. I'm using a Spark cluster via Spark Connect Server in Airflow. Everything is run through docker containers.

I have no problem preprocessing my data, or even showing the DataFrame in the logs with spark_df.show().

Buy when I want to use spark_df.count() in order to get the nrows of the data, I get this error:

java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD

I tried to change my spark-defaults.conf, but did not resolve the issue.

Here is the function I use to get DataFrame from a parquet:

def spark_read_parquet_to_psdf(filename: str, spark: SparkSession) -> ps.DataFrame:

    spark_df = spark.read \
        .option("header", "true") \
        .parquet(f"/tmp/{filename}")

    spark_df.show()
    spark_df.cache()

    df = spark_df.pandas_api()

    return df

The only difference when I want to use a Spark DataFrame, is the last row that changes.

I also tried to run print(df) and get the same mistake.

Do you have any idea on what could be the problem?

Here are my Dockerfiles and docker-compose

FROM ubuntu:23.10

RUN apt update -y && \
    apt upgrade -y

RUN apt install -y \
    default-jdk \
    wget \
    ssh \
    openssh-server


RUN wget https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz && \
    tar xvf spark-3.5.0-bin-hadoop3.tgz && \
    mv spark-3.5.0-bin-hadoop3 /opt/spark

ENV SPARK_HOME=/opt/spark
ENV PATH=$PATH:$SPARK_HOME/bin
ENV SPARK_CONF_DIR=/opt/spark/conf

RUN mkdir /tmp/spark-events

COPY spark_conf_files/init_hosts.sh /usr/local/bin/

COPY spark_conf_files/spark_jars/ /opt/spark/jars
COPY spark_conf_files/spark-defaults.conf /opt/spark/conf/

RUN chmod +x /usr/local/bin/init_hosts.sh
RUN chmod 777 /opt/spark/jars/sqljdbc42.jar
RUN chmod 777 /opt/spark/jars/pgjdbc_42.7.0.jar

ENTRYPOINT ["/usr/local/bin/init_hosts.sh"]
version: '3.8'
services:
  spark-connect:
    container_name: 'spark-connect'
    build: 
      context: .
      dockerfile: Dockerfile.spark
    ports:
      - "50131:4040"
      - "50132:15002"
    command: bash -c "
      sleep 10 &&
      /opt/spark/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.0 --master spark://spark-master:7077 &&
      tail -f /dev/null"
    tty: true
    networks:
      - default
    environment:
      - HADOOP_HOME=/etc/hadoop
      - HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
      - LD_LIBRARY_PATH=$HADOOP_HOME/lib/native:$LD_LIBRARY_PATH
    volumes:
      - spark-shared-volume:/tmp

  spark-master:
    container_name: 'spark-master'
    build: 
      context: .
      dockerfile: Dockerfile.spark
    ports:
      - "50110:8080"
      - "50111:7077"
    command: bash -c "bash /etc/init.d/ssh start &&
      /opt/spark/sbin/start-master.sh &&
      tail -f /dev/null"
    tty: true
    networks:
      - default
    environment:
      - SPARK_MASTER_WEBUI_PORT=8080
    volumes:
      - spark-shared-volume:/tmp

  spark-worker:
    build: 
      context: .
      dockerfile: Dockerfile.spark
    command: bash -c "sleep 10 &&
      /opt/spark/sbin/start-worker.sh spark://spark-master:7077 &&
      tail -f /dev/null"
    ports:
      - "50121-50122:8081" 
    depends_on:
      - spark-master
    environment:
      - SPARK_MASTER_URL=spark://spark-master:7077
    networks:
      - default
    deploy:
      mode: replicated
      replicas: 2
    volumes:
      - spark-shared-volume:/tmp

volumes:
  spark-shared-volume:

And here is my spark-defaults.conf

spark.executor.memory 8g
spark.executor.memoryOverhead 1g
spark.executor.cores 2

spark.sql.execution.arrow.pyspark.enabled true
spark.driver.cores 2
spark.driver.memory 8g
spark.network.timeout 600s
spark.files.fetchTimeout 600s

spark.serializer org.apache.spark.serializer.KryoSerializer
spark.jars /opt/spark/jars/sqljdbc42.jar,/opt/spark/jars/pgjdbc_42.7.0.jar

EDIT: I realised that the error always shows when i run .collect()

0

There are 0 answers