Using Spark Connect with Scala

646 views Asked by At

I would like to use the new Spark Connect feature within a Scala program.

I started the Connect server and I am able to connect to it from Pyspark and also when submitting Python script, e.g., with spark-submit --remote sc://localhost connect_test.py

However, if I try to submit a Scala application, I receive the exception, that I should set a master URL: Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration

However, setting a --master local as well fails with an message that I cannot set both, remote and master at the same time (as stated in the documentation).

I also tried to set SPARK_REMOTE environment variable, which does not work, with the same messages as above.

Calling SparkSession.builder().remote("sc://localhost") is also not possible, because there is no method remote in org.apache.spark.sql.SparkSession. There is a file named connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala that also defines a SparkSession class with such a remote method. However, I'm not sure how to use it.

Is there an implicit conversion function that I have to include?

Was anyone able to use Spark Connect with Scala or is it only available for Python currently?


Update

I was able to compile and execute my test program with the following settings:

  • I have the Spark connect client jvm and Spark SQL as compile dependencies. The latter marked as provided in my build.sbt
  • When doing spark-submit, I added the client jvm jar to the driver classpath with --driver-class-path /path/to/spark-connect-client-jvm.jar

Now I can start the application. However, I thought it was possible to start the application as a normal Java application without spark-submit?

1

There are 1 answers

0
hage On

I finally got it working.

For anyone facing the same issues:

My test program:

package com.acme.hage

import org.apache.spark.sql.SparkSession
import java.util.concurrent.TimeUnit
import org.apache.spark.sql.connect.client.SparkConnectClient

object SparkConnectTest {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().remote("sc://localhost").build()
  
    spark.read
      .option("sep",":")
      .csv("/etc/passwd")
      .collect.foreach(r => println(r.mkString(" | ")))
  }
}

Note that unlike in the current example page of Spark connect, it's build() instead of getOrCreate to get the Spark session.

My build.sbt:

name := "spark-connect-test"

version := "0.0.1"

scalaVersion := "2.12.17"

libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.4.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.0"  % "provided"

And I can run the program with either spark-submit or as a "normal" Java application:

  • spark-submit --name connect-test --class com.acme.hage.SparkConnectTest --remote "sc://localhost" --driver-class-path /Users/hage/code/lib/spark/spark/jars/spark-core_2.12-3.4.0.jar:/Users/hage/.ivy2/jars/org.apache.spark_spark-connect-client-jvm_2.12-3.4.0.jar ./target/scala-2.12/spark-connect-test_2.12-0.0.1.jar
  • java -cp "./target/scala-2.12/spark-connect-test_2.12-0.0.1.jar:/Users/hage/.ivy2/jars/org.apache.spark_spark-connect-client-jvm_2.12-3.4.0.jar:/Users/hage/code/lib/spark/spark/jars/*" com.acme.hage.SparkConnectTest