How to guarantee effective cluster resource utilization by Futures in spark

300 views Asked by At

I want to run multiple spark SQL parallel in a spark cluster, so that I can utilize the complete resource cluster wide. I'm using sqlContext.sql(query).

I saw some sample code here like follows,

val parallelism = 10
val executor = Executors.newFixedThreadPool(parallelism)
val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
val tasks: Seq[String] = ???
val results: Seq[Future[Int]] = tasks.map(query => {
  Future{
    //spark stuff here
    0
  }(ec)
})
val allDone: Future[Seq[Int]] = Future.sequence(results)
//wait for results
Await.result(allDone, scala.concurrent.duration.Duration.Inf)
executor.shutdown //otherwise jvm will probably not exit 

As I understood, the ExecutionContext compute the available cores in the machine(using ForkJoinPool) and do the parallelism accordingly. But what happens if we consider the spark cluster other-than the single machine and How can it guarantee the complete cluster resource utilization.?

eg: If I have a 10 node cluster with each 4 cores, then how can the above code guarantees that the 40 cores will be utilized.

EDITS:-

Lets say there are 2 sql to be executed, we have 2 way to do this,

  1. submit the queries sequentially, so that second query will be completed only after the execution of the first. (because sqlContext.sql(query) is a synchronous call)

  2. Submit both the queries parallel using Futures, so that both the queries will executed independently and parallel in the cluster assuming there are enough resources (in both cases).

I think the second one is better because it uses the maximum resources available in the cluster and if the first query fully utilized the resources the scheduler will wait for the completion of the job(depending upon the policy) which is fair in this case.

But as user9613318 mentioned 'increasing pool size will saturate the driver' Then how can I efficiently control the threads for better resource utilization.

1

There are 1 answers

1
Alper t. Turker On

Parallelism will have a minimal impact here, and additional cluster resources don't really affect the approach. Futures (or Threads) are use not to parallelize execution, but to avoid blocking execution. Increasing pool size can only saturate the driver.

What you really should be looking at is Spark in-application scheduling pools and tuning of the number of partitions for narrow (How to change partition size in Spark SQL, Whats meaning of partitionColumn, lowerBound, upperBound, numPartitions parameters?) and wide (What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?) transformations.

If jobs are completely independent (the code structure suggests that) it could be preferred to submit each one separately, with its own set of allocated resources, and configure cluster scheduling pools accordingly.