On kubernetes my spark worker pod is trying to access thrift pod by name

941 views Asked by At

Okay. Where to start? I am deploying a set of Spark applications to a Kubernetes cluster. I have one Spark Master, 2 Spark Workers, MariaDB, a Hive Metastore (that uses MariaDB - and it's not a full Hive install - it's just the Metastore), and a Spark Thrift Server (that talks to Hive Metastore and implements the Hive API).

So this setup is working pretty well for everything except the setup of the Thrift Server job (start-thriftserver.sh in the Spark sbin directory on the thrift server pod). By working well I say that outside my cluster I can create spark jobs and submit them to master and then using the Web UI I can see my code test app ran to completion utilizing both workers.

Now the problem. When you launch the start-thriftserver.sh it submits a job to the cluster with itself as the driver (I believe - which is correct behavior). And when I look at the related spark job via the WebUI I see it has workers and they repeatedly get hatched and then exit shortly therafter. When I look at the workers' stderr logs I see that every worker launches and tries to connect back to the thrift server pod at the spark.driver.port. This is correct behavior I believe. The gotcha is that connection fails because it says unknown host exception and it uses a kubernetes raw pod name (not a service name and with no IP in the name) of the thrift server pod to say it can't find the thrift server that initiated the connection. Now Kubernetes DNS stores service names and then only pod names as prefaced with their private IP. In other words the raw name of the pod (without an IP) is never registered with the DNS. That is not how kubernetes works.

So my question. I am struggling to figure out why the spark worker pod is using a raw pod name to try to find the thrift server. It seems it should never do this and that it should be impossible to ever satisfy that request. I have wondered if there is some spark config setting that would tell the workers that the (thrift) driver it needs to be searching for is actually spark-thriftserver.my-namespace.svc. But I can't find anything having done much searching.

There are so many settings that go into a cluster like this that I don't want to barrage you with info. One thing that might clarify my setup: the following string is dumped at the top of a worker log that fails. Notice the raw pod name of the thrift server for driver-url. If anyone has any clue what steps to take to fix this please let me know. I'll edit this post and share settings etc as people request them. Thanks for helping.

Spark Executor Command: "/usr/lib/jvm/java-1.8-openjdk/jre/bin/java" "-cp" "/spark/conf/:/spark/jars/*" "-Xmx512M" "-Dspark.master.port=7077" "-Dspark.history.ui.port=18081" "-Dspark.ui.port=4040" "-Dspark.driver.port=41617" "-Dspark.blockManager.port=41618" "-Dspark.master.rest.port=6066" "-Dspark.master.ui.port=8080" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-thriftserver-6bbb54768b-j8hz8:41617" "--executor-id" "12" "--hostname" "172.17.0.6" "--cores" "1" "--app-id" "app-20220408001035-0000" "--worker-url" "spark://[email protected]:37369"
1

There are 1 answers

1
Eugene Lopatkin On

This is a brief guide on how to successfully run Spark Thrift Service on Kubernetes:

Spark Thrift Server, a multi-user JDBC/ODBC-accessible service for Spark, can be deployed on Kubernetes in client mode, following the design of the Spark Thrift Server. This means that the Spark Submit, when run in client mode, is launched on the node where the spark-submit command was executed. To overcome the design limitations, you can launch the Thrift Server from a node in Kubernetes in client mode, making this node the driver.

There are several key points to consider when running Spark on Kubernetes in client mode:

  • Client Mode Networking: Spark executors must be able to connect to the Spark driver over a routable hostname and port. You can use a headless service to give your driver pod a stable, routable hostname. This can be specified via spark.driver.host and spark.driver.port.
  • Client Mode Executor Pod Garbage Collection: If your Spark driver runs in a pod, it’s highly recommended to set spark.kubernetes.driver.pod.name to the name of that pod. This ensures that all the application’s executor pods are deleted once the driver pod is removed from the cluster.
  • Executor Pod Cleanup: If your application isn’t running inside a pod, or if spark.kubernetes.driver.pod.name isn't set when your application is running in a pod, the executor pods may not be properly deleted when the application exits.
  • Controlling Executor Pod Names: You can use spark.kubernetes.executor.podNamePrefix to control the executor pod names. Make this unique across all jobs in the same namespace to avoid conflicts.

The deployment consists of two parts: a Service and a StatefulSet. The Service is crucial for network access within the cluster and allows Spark executors to connect to the Spark driver over a routable hostname and port. You can define a headless Service that exposes two ports: one for the Thrift Server and another for the Spark driver.

StatefulSet will manage the deployment of the Spark Thrift Server. In the StatefulSet configuration, you'll specify the Spark Thrift Server image, the command to start the Thrift Server, and several Spark configurations. Key decision to use a StatefulSet over other alternatives such as Deployment or ReplicaSet was primarily driven by the need for consistent pod names, a requirement brought forth by the spark.kubernetes.driver.pod.name configuration. StatefulSet in Kubernetes inherently supports this requirement by providing stable and predictable pod names, regardless of rescheduling.

In conclusion, running Spark Thrift Server on Kubernetes necessitates running the server in client mode due to design limitations.

Short example:

apiVersion: v1
kind: Service
metadata:
  name: spark-thrift-service
spec:
  clusterIP: None
  selector:
    app: spark-thrift-server
  ports:
    - protocol: TCP
      name: spark-driver-port
      port: 7078
      targetPort: 7078
---
---
apiVersion: v1
kind: Secret
metadata:
  name: kubernetes-config
type: Opaque
data:
  config: |
    <base64 encoded config>
  ca.pem: |
    <base64 encoded key>
---
apiVersion: v1
kind: Secret
metadata:
  name: kubernetes-config
type: Opaque
data:
  config: |
    <base64 encoded config>
  ca.pem: |
    <base64 encoded key>
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: spark-thrift-server
spec:
  serviceName: spark-thrift-service
  replicas: 1
  selector:
    matchLabels:
      app: spark-thrift-server
  template:
    metadata:
      labels:
        app: spark-thrift-server
    spec:
      containers:
        - name: thrift-server
          image: apache/spark:3.4.0
          command:
            - 'bash'
            - '-c'
            - >-
              /opt/spark/sbin/start-thriftserver.sh
              --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port>
              --packages com.typesafe:config:1.4.2,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.spark:spark-hadoop-cloud_2.12:3.4.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0
              --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp"
              --conf spark.dynamicAllocation.enabled=true
              --conf spark.hadoop.fs.s3a.access.key=<your-access-key>
              --conf spark.hadoop.fs.s3a.committer.name=directory
              --conf spark.hadoop.fs.s3a.endpoint=<your-endpoint>
              --conf spark.hadoop.fs.s3a.fast.upload=true
              --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
              --conf spark.hadoop.fs.s3a.secret.key=<your-secret-key>
              --conf spark.hadoop.hive.metastore.uris=thrift://<hive-metastore-host>:<hive-metastore-port>
              --conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
              --conf spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored=true
              --conf spark.hadoop.parquet.enable.summary-metadata=false
              --conf spark.kubernetes.container.image=apache/spark:v3.4.0
              --conf spark.kubernetes.file.upload.path=s3a://some/tmp/folder/thriftserver
              --conf spark.sql.catalogImplementation=hive
              --conf spark.sql.hive.metastorePartitionPruning=true
              --conf spark.sql.parquet.filterPushdown=true
              --conf spark.sql.parquet.mergeSchema=false
              --conf spark.sql.parquet.output.committer.class=org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
              --conf spark.sql.sources.commitProtocolClass=org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
              --conf spark.sql.sources.default=parquet
              --conf spark.sql.sources.partitionOverwriteMode=dynamic
              --conf spark.sql.warehouse.dir=s3a://warehouse
              --conf spark.kubernetes.driver.pod.name=spark-thrift-server-0
              --conf spark.driver.host=spark-thrift-service
              --conf spark.driver.port=7078
              && tail -f /dev/null

          volumeMounts:
            - name: kubernetes-config
              mountPath: /root/.kube
      volumes:
        - name: kubernetes-config
          secret:
            secretName: kubernetes-config