spark-submit works for yarn-cluster mode but SparkLauncher doesn't, with same params

1.2k views Asked by At

I'm able to submit a spark job through spark-submit however when I try to do the same programatically using SparkLauncher, it gives me nothing ( I dont even see a Spark job on the UI)

Below is the scenario:

I've a server(say hostname: cr-hdbc101.dev.local:7123) which hosts the hdfs cluster. I push a fat jar to the server which I'm trying to exec. The following spark-submit works as expected and a spark job is submitted in yarn-cluster mode

spark-submit \
      --verbose \
      --class com.digital.StartSparkJob \
      --master yarn \
      --deploy-mode cluster \
      --num-executors 2 \
      --driver-memory 2g \
      --executor-memory 3g \
      --executor-cores 4 \
      /usr/share/Deployments/Consolidateservice.jar "<arg_to_main>"

However the following piece of SparkLauncher code doesn't work

val sparkLauncher = new SparkLauncher()
    sparkLauncher
      .setSparkHome("/opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/lib/spark")
      .setAppResource("/usr/share/Deployments/Consolidateservice.jar")
      .setMaster("yarn-cluster")
      .setVerbose(true)
      .setMainClass("com.digital.StartSparkJob")
      .setDeployMode("cluster")
      .setConf("spark.driver.cores", "2")
      .setConf("spark.driver.memory", "2g")
      .setConf("spark.executor.cores", "4")
      .setConf("spark.executor.memory", "3g")
      .addAppArgs(<arg_to_main>)
      .startApplication()

I thought maybe SparkLauncher is not getting correct env variables to work with, so I send the following to SparkLauncher, but to no avail(basically I pass everything in the spark-env.sh to SparkLauncher)

val env: java.util.Map[String, String] = new java.util.HashMap[String, String]
    env.put("SPARK_CONF_DIR", "/etc/spark/conf.cloudera.spark_on_yarn")
    env.put("HADOOP_HOME", "/opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/lib/hadoop")
    env.put("YARN_CONF_DIR", "/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf")
    env.put("SPARK_LIBRARY_PATH", "/opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/lib/spark/lib")
    env.put("SCALA_LIBRARY_PATH", "/opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/lib/spark/lib")
    env.put("LD_LIBRARY_PATH", "/opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/lib/hadoop/lib/native")
    env.put("SPARK_DIST_CLASSPATH", "/etc/spark/conf.cloudera.spark_on_yarn/classpath.txt")

    val sparkLauncher = new SparkLauncher(env)
    sparkLauncher
      .setSparkHome("/opt/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/lib/spark")...

What adds to the frustration, is that when I use same SparkLauncher code for yarn-client mode, it works perfectly fine.

Can someone please point to me what am I missing, I just feel I'm staring at the issue without recognizing it.

NOTE: Both the main class(com.digital.StartSparkJob) and SparkLauncher code are part of the fat jar I'm pushing to the server. I just call the SparkLauncher code with an external API, which in turn should open a driver JVM on the cluster

SparkVersion: 1.6.0, scala ver: 2.10.5

1

There are 1 answers

0
ni_i_ru_sama On BEST ANSWER

I wasn't even getting logs on the Spark-UI...the sparkApp wasn't even running. Therefore I ran the sparkLauncher as a process(using .launch().waitFor() ) so that I can capture the error Logs.

I captured the logs using .getInputStream and .getErrorStream and found out that the user being passed to the cluster is wrong. My cluster will work only for user "abcd".

I did set System.setProperty("HADOOP_USER_NAME", "abcd"), as well as added "spark.yarn.appMasterEnv.HADOOP_USER_NAME=abcd" to spark-default.conf, before launching SparkLauncher. However looks like they don't get ported over to cluster.

I therefore passed the HADOOP_USER_NAME as an childArg to the SparkLauncher

val env: java.util.Map[String, String] = new java.util.HashMap[String, String]
    env.put("SPARK_CONF_DIR", "/etc/spark/conf.cloudera.spark_on_yarn")
    env.put("YARN_CONF_DIR", "/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf")
    env.put("HADOOP_USER_NAME", "abcd")

try {
val sparkLauncher = new SparkLauncher(env)...