Understanding spark process behaviour

113 views Asked by At

I would like to understand a process behavior. Basically this spark process must be create at most five files, one for each territory and save them into HDFS.

Territories are provided by an array of five strings. But when I'm looking at spark UI, I see many times the same action being executed.

These are my questions:

  • Why isEmpty action has been executed 4 times for each territory instead of one? I expect just one action for territory.
  • How are decided the tasks number when isEmpty is calculated? First time there is just one task, the second time tasks are 4, third are 20 and fourth are 35. Which the logic behind that sizing? Can I control that number in some way?

NOTE: is someone has a more say big data solution for to accomplish the same process goal, please suggest me.

This is the code excerpt for the Spark process:

class IntegrationStatusD1RequestProcess {

  logger.info(s"Retrieving all measurement point from DB")
  val allMPoints = registryData.createIncrementalRegistryByMPointID()
    .setName("allMPoints")
    .persist(StorageLevel.MEMORY_AND_DISK)

  logger.info("getTerritories return always an array of five String")
  intStatusHelper.getTerritories.foreach { territory =>

    logger.info(s"Retrieving measurement point for territory $territory")
    val intStatusesChanged = allMPoints
      .filter { m => m.getmPoint.substring(0, 3) == territory }
      .setName(s"intStatusesChanged_${territory}")
      .persist(StorageLevel.MEMORY_AND_DISK)

    intStatusesChanged.isEmpty match {
      case true => logger.info(s"No changes detected for territory")
      case false =>

      //create file and save it into hdfs

    }
  }
}

This is the image showing all the spark jobs:

enter image description here

The following first two images showing isEmpty stages: enter image description here enter image description here

1

There are 1 answers

0
Tim On BEST ANSWER

isEmpty is inefficient if you expect it to be true!

Here's the RDD code for isEmpty:

def isEmpty(): Boolean = withScope {
  partitions.length == 0 || take(1).length == 0
}

It calls take. This is an efficient implementation if you think the RDD isn't empty, but is a horrible implementation if you think that it is.

The implementation of take follows this recursive step, starting at parts = 1:

  1. Collect the first parts partitions.
  2. Check if this result contain >= n items.
  3. If yes, take the first n
  4. If no, repeat step 1 with parts = parts * 4.

This implementation strategy lets the execution short-circuit if the RDD has more elements than you want to take, which is usually true. But if your RDD has fewer elements than you want to take, you end up computing the partition #1 log4(nPartitions) + 1 times, partitions #2-4 log4(nPartitions) times, partitions #5-16 log4(nPartitions) - 1 times, and so on.

A better implementation for this use case

This implementation only computes each partition once by sacrificing short-circuit capability:

def fasterIsEmpty(rdd: RDD[_]): Boolean = {
  rdd.mapPartitions(it => Iterator(it.isEmpty))
    .fold(true)(_ && _)
}