I am trying to learn apache-spark. This is my code which i am trying to run. I am using pyspark api
.
data = xrange(1, 10000)
xrangeRDD = sc.parallelize(data, 8)
def ten(value):
"""Return whether value is below ten.
Args:
value (int): A number.
Returns:
bool: Whether `value` is less than ten.
"""
if (value < 10):
return True
else:
return False
filtered = xrangeRDD.filter(ten)
print filtered.collect()
print filtered.take(8)
print filtered.collect()
gives this as output [1, 2, 3, 4, 5, 6, 7, 8, 9]
.
As per my understanding filtered.take(n)
will take n elements from RDD and print it.
I am trying two cases :-
1)Giving value of n less than or equal to number of elements in RDD
2)Giving value of n greater than number of elements in RDD
I have pyspark application UI
to see number of jobs that run in each case. In first case only one job
is running but in second five jobs
are running.
I am not able to understand why is this happening. Thanks in advance.
RDD.take
tries to evaluate as few partitions as possible.If you
take(9)
it will fetch partition 0 (job 1) find 9 items and happily terminate.If you
take(10)
it will fetch partition 0 (job 1) and find 9 items. It needs one more. Since partition 0 had 9, it thinks partition 1 will probably have at least one more (job 2). But it doesn't! In 2 partitions it has found 9 items. So 4.5 items per partition so far. The formula divides it by 1.5 for pessimism and decides 10 / (4.5 / 1.5) = 3 partitions will do it. So it fetches partition 2 (job 3). Still nothing. So 3 items per partition so far, divided by 1.5 means we need 10 / (3 / 1.5) = 5 partitions. It fetches partitions 3 and 4 (job 4). Nothing. We have 1.8 items per partition, 10 / (1.8 / 1.5) = 8. It fetches the last 3 partitions (job 5) and that's it.The code for this algorithm is in RDD.scala. As you can see it's nothing but heuristics. It saves some work usually, but it can lead to unnecessarily many jobs in degenerate cases.