Receiving result from spark job launched using SparkLauncher

628 views Asked by At

I am launching a spark job using following code:

   public static void main(String[] args) throws InterruptedException, ExecutionException {
            Process sparkProcess;
            try {
                sparkProcess = new SparkLauncher()
                        .setSparkHome("C:\\spark-2.0.0-bin-hadoop2.7")
                        .setAppResource("hdfs://server:9000/inputs/test.jar")
                        .setMainClass("com.test.TestJob")
                        .setMaster("spark://server:6066") //rest URL of spark
                        .setVerbose(true)
                        .setDeployMode("cluster")
                        .addAppArgs("abc")
                        .launch();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }

             ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> submit = executorService.submit(() -> {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(sparkProcess.getInputStream()))) {
                while (sparkProcess.isAlive()) {
                    try {
                        System.out.println("input stream line:" + reader.readLine());
                        Thread.sleep(1000);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
            return sparkProcess.exitValue();
        });
        System.out.println("Exit value:" + submit.get());
        sparkProcess.waitFor();
}

My test.jar file code is as follows:

public static void main(String[] args) {
        SparkSession spark = SparkSession.builder().appName("test").config("spark.executor.memory", "1g")
                .config("spark.executor.cores", 1).getOrCreate();
        JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
        long count = context.parallelize(Arrays.asList(1, 2, 3, 4, 5)).count();
        System.out.println("Count:" + count); //want to retrieve this detail on launching application.
        spark.stop();
    }

I want to retrieve count at launching app on successful completion of spark job. However from InputStream of sparkProcess I am always receiving null What am I doing wrong here?

0

There are 0 answers