Spark streaming with Yarn: executors not fully utilized

715 views Asked by At

I am running spark streaming with Yarn with -

spark-submit --master yarn --deploy-mode cluster --num-executors 2 --executor-memory 8g --driver-memory 2g --executor-cores 8 ..

I am consuming Kafka through DireactStream approach (No receiver). I have 2 topics (each with 3 partitions).

I reparation RDD (i have one DStream) into 16 parts (assuming no of executor * num of cores = 2 * 8 = 16 Is it correct ?) and then i do foreachPartition and writes each partition to local file and then send it to other server (not spark) through http (Using apache http sync client with pooling manager via post with multi-part).

When i checked details of this step (or JOB is it correct naming?) through Spark UI, it showed that total 16 task executed on single executor with 8 task at a time.

This is Spark UI details -

Details for Stage 717 (Attempt 0)

Index  ID  Attempt Status  Locality Level  Executor ID / Host  Launch Time Duration  GC Time Shuffle Read Size / Records Errors
0  5080  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:46 2 s 11 ms 313.3 KB / 6137 
1  5081  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:46 2 s 11 ms 328.5 KB / 6452 
2  5082  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:46 2 s 11 ms 324.3 KB / 6364 
3  5083  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:46 2 s 11 ms 321.5 KB / 6306 
4  5084  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:46 2 s 11 ms 324.8 KB / 6364 
5  5085  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:46 2 s 11 ms 320.8 KB / 6307 
6  5086  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:46 2 s 11 ms 323.4 KB / 6356 
7  5087  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:46 3 s 11 ms 316.8 KB / 6207 
8  5088  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:48 2 s   317.7 KB / 6245 
9  5089  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:48 2 s   320.4 KB / 6280 
10  5090  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:48 2 s   323.0 KB / 6334 
11  5091  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:48 2 s   323.7 KB / 6371 
12  5092  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:48 2 s   316.7 KB / 6218 
13  5093  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:48 2 s   321.0 KB / 6301 
14  5094  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:48 2 s   321.4 KB / 6304 
15  5095  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 12:11:49 2 s   319.1 KB / 6267 

I was expecting it to execute 16 parallel task (2 executor * 8 core) on either one or more executor. I think i am missing something. Please help.

Update:

  1. Incoming data is not evenly distributed. e.g. 1st topic has 2nd partition with 5*5 = 25k messages (5k = maxRatePerPartition, 5s = batch interval) and other two partition has almost 0 data few times. The 2nd Topic has ~500-4000 message per batch which is evenly distributed across 3 partition.

  2. when there is no data in topic 1 then i see 16 parallel task processing across 2 executors.


Index ID  Attempt Status  Locality Level  Executor ID / Host  Launch Time Duration  GC Time Shuffle Read Size / Records Errors
0 330402  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/28 04:31:41 1 s   19.2 KB / 193 
1 330403  0 SUCCESS NODE_LOCAL  2 / executor2_machine_host_name  2016/12/28 04:31:41 1 s   21.2 KB / 227 
2 330404  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/28 04:31:41 1 s   20.8 KB / 214 
3 330405  0 SUCCESS NODE_LOCAL  2 / executor2_machine_host_name  2016/12/28 04:31:41 1 s   20.9 KB / 222 
4 330406  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/28 04:31:41 2 s   21.0 KB / 222 
5 330407  0 SUCCESS NODE_LOCAL  2 / executor2_machine_host_name  2016/12/28 04:31:41 1 s   20.5 KB / 213 
6 330408  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/28 04:31:41 1 s   20.4 KB / 207 
7 330409  0 SUCCESS NODE_LOCAL  2 / executor2_machine_host_name  2016/12/28 04:31:41 1 s   19.2 KB / 188 
8 330410  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/28 04:31:41 1 s   20.4 KB / 214 
9 330411  0 SUCCESS NODE_LOCAL  2 / executor2_machine_host_name  2016/12/28 04:31:41 1 s   20.1 KB / 206 
10  330412  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/28 04:31:41 0.6 s   18.7 KB / 183 
11  330413  0 SUCCESS NODE_LOCAL  2 / executor2_machine_host_name  2016/12/28 04:31:41 1 s   20.6 KB / 217 
12  330414  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/28 04:31:41 1 s   20.0 KB / 206 
13  330415  0 SUCCESS NODE_LOCAL  2 / executor2_machine_host_name  2016/12/28 04:31:41 1 s   20.7 KB / 216 
14  330416  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/28 04:31:41 1 s   18.8 KB / 186 
15  330417  0 SUCCESS NODE_LOCAL  2 / executor2_machine_host_name  2016/12/28 04:31:41 1 s   20.4 KB / 213 
2

There are 2 answers

1
Shahbaz Hussain On

Try increasing number of partitions equal to number of executor cores,since you are giving 8 executor cores,increase number of partitions on Kafka topic to 8. Also ,check what happens if you do not do re partition.

0
Sachin On

Set below parameters with --num-executors 6

spark.default.parallelism

spark.streaming.concurrentJobs

Set above parameter values according to your requirement and environment. This will work for you.