I am launching a spark job using following code:
public static void main(String[] args) throws InterruptedException, ExecutionException {
Process sparkProcess;
try {
sparkProcess = new SparkLauncher()
.setSparkHome("C:\\spark-2.0.0-bin-hadoop2.7")
.setAppResource("hdfs://server:9000/inputs/test.jar")
.setMainClass("com.test.TestJob")
.setMaster("spark://server:6066") //rest URL of spark
.setVerbose(true)
.setDeployMode("cluster")
.addAppArgs("abc")
.launch();
} catch (IOException e) {
throw new RuntimeException(e);
}
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> submit = executorService.submit(() -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(sparkProcess.getInputStream()))) {
while (sparkProcess.isAlive()) {
try {
System.out.println("input stream line:" + reader.readLine());
Thread.sleep(1000);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
return sparkProcess.exitValue();
});
System.out.println("Exit value:" + submit.get());
sparkProcess.waitFor();
}
My test.jar file code is as follows:
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("test").config("spark.executor.memory", "1g")
.config("spark.executor.cores", 1).getOrCreate();
JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
long count = context.parallelize(Arrays.asList(1, 2, 3, 4, 5)).count();
System.out.println("Count:" + count); //want to retrieve this detail on launching application.
spark.stop();
}
I want to retrieve count at launching app on successful completion of spark job. However from InputStream
of sparkProcess
I am always receiving null
What am I doing wrong here?