I would like to understand a process behavior. Basically this spark process must be create at most five files, one for each territory and save them into HDFS.
Territories are provided by an array of five strings. But when I'm looking at spark UI, I see many times the same action being executed.
These are my questions:
- Why isEmpty action has been executed 4 times for each territory instead of one? I expect just one action for territory.
- How are decided the tasks number when isEmpty is calculated? First time there is just one task, the second time tasks are 4, third are 20 and fourth are 35. Which the logic behind that sizing? Can I control that number in some way?
NOTE: is someone has a more say big data solution for to accomplish the same process goal, please suggest me.
This is the code excerpt for the Spark process:
class IntegrationStatusD1RequestProcess {
logger.info(s"Retrieving all measurement point from DB")
val allMPoints = registryData.createIncrementalRegistryByMPointID()
.setName("allMPoints")
.persist(StorageLevel.MEMORY_AND_DISK)
logger.info("getTerritories return always an array of five String")
intStatusHelper.getTerritories.foreach { territory =>
logger.info(s"Retrieving measurement point for territory $territory")
val intStatusesChanged = allMPoints
.filter { m => m.getmPoint.substring(0, 3) == territory }
.setName(s"intStatusesChanged_${territory}")
.persist(StorageLevel.MEMORY_AND_DISK)
intStatusesChanged.isEmpty match {
case true => logger.info(s"No changes detected for territory")
case false =>
//create file and save it into hdfs
}
}
}
This is the image showing all the spark jobs:
isEmpty is inefficient if you expect it to be true!
Here's the RDD code for
isEmpty
:It calls
take
. This is an efficient implementation if you think the RDD isn't empty, but is a horrible implementation if you think that it is.The implementation of
take
follows this recursive step, starting at parts = 1:n
items.n
parts = parts * 4
.This implementation strategy lets the execution short-circuit if the RDD has more elements than you want to
take
, which is usually true. But if your RDD has fewer elements than you want totake
, you end up computing the partition #1log4(nPartitions) + 1
times, partitions #2-4log4(nPartitions)
times, partitions #5-16log4(nPartitions) - 1
times, and so on.A better implementation for this use case
This implementation only computes each partition once by sacrificing short-circuit capability: