I am running spark streaming with Yarn with -
spark-submit --master yarn --deploy-mode cluster --num-executors 2 --executor-memory 8g --driver-memory 2g --executor-cores 8 ..
I am consuming Kafka through DireactStream
approach (No receiver). I have 2 topics (each with 3 partitions).
I reparation RDD (i have one DStream) into 16 parts (assuming no of executor * num of cores = 2 * 8 = 16 Is it correct ?) and then i do foreachPartition and writes each partition to local file and then send it to other server (not spark) through http (Using apache http sync client with pooling manager via post with multi-part).
When i checked details of this step (or JOB is it correct naming?) through Spark UI, it showed that total 16 task executed on single executor with 8 task at a time.
This is Spark UI details -
Details for Stage 717 (Attempt 0)
Index ID Attempt Status Locality Level Executor ID / Host Launch Time Duration GC Time Shuffle Read Size / Records Errors
0 5080 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:46 2 s 11 ms 313.3 KB / 6137
1 5081 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:46 2 s 11 ms 328.5 KB / 6452
2 5082 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:46 2 s 11 ms 324.3 KB / 6364
3 5083 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:46 2 s 11 ms 321.5 KB / 6306
4 5084 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:46 2 s 11 ms 324.8 KB / 6364
5 5085 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:46 2 s 11 ms 320.8 KB / 6307
6 5086 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:46 2 s 11 ms 323.4 KB / 6356
7 5087 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:46 3 s 11 ms 316.8 KB / 6207
8 5088 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:48 2 s 317.7 KB / 6245
9 5089 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:48 2 s 320.4 KB / 6280
10 5090 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:48 2 s 323.0 KB / 6334
11 5091 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:48 2 s 323.7 KB / 6371
12 5092 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:48 2 s 316.7 KB / 6218
13 5093 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:48 2 s 321.0 KB / 6301
14 5094 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:48 2 s 321.4 KB / 6304
15 5095 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/27 12:11:49 2 s 319.1 KB / 6267
I was expecting it to execute 16 parallel task (2 executor * 8 core) on either one or more executor. I think i am missing something. Please help.
Update:
Incoming data is not evenly distributed. e.g. 1st topic has 2nd partition with 5*5 = 25k messages (5k = maxRatePerPartition, 5s = batch interval) and other two partition has almost 0 data few times. The 2nd Topic has ~500-4000 message per batch which is evenly distributed across 3 partition.
when there is no data in topic 1 then i see 16 parallel task processing across 2 executors.
Index ID Attempt Status Locality Level Executor ID / Host Launch Time Duration GC Time Shuffle Read Size / Records Errors
0 330402 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/28 04:31:41 1 s 19.2 KB / 193
1 330403 0 SUCCESS NODE_LOCAL 2 / executor2_machine_host_name 2016/12/28 04:31:41 1 s 21.2 KB / 227
2 330404 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/28 04:31:41 1 s 20.8 KB / 214
3 330405 0 SUCCESS NODE_LOCAL 2 / executor2_machine_host_name 2016/12/28 04:31:41 1 s 20.9 KB / 222
4 330406 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/28 04:31:41 2 s 21.0 KB / 222
5 330407 0 SUCCESS NODE_LOCAL 2 / executor2_machine_host_name 2016/12/28 04:31:41 1 s 20.5 KB / 213
6 330408 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/28 04:31:41 1 s 20.4 KB / 207
7 330409 0 SUCCESS NODE_LOCAL 2 / executor2_machine_host_name 2016/12/28 04:31:41 1 s 19.2 KB / 188
8 330410 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/28 04:31:41 1 s 20.4 KB / 214
9 330411 0 SUCCESS NODE_LOCAL 2 / executor2_machine_host_name 2016/12/28 04:31:41 1 s 20.1 KB / 206
10 330412 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/28 04:31:41 0.6 s 18.7 KB / 183
11 330413 0 SUCCESS NODE_LOCAL 2 / executor2_machine_host_name 2016/12/28 04:31:41 1 s 20.6 KB / 217
12 330414 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/28 04:31:41 1 s 20.0 KB / 206
13 330415 0 SUCCESS NODE_LOCAL 2 / executor2_machine_host_name 2016/12/28 04:31:41 1 s 20.7 KB / 216
14 330416 0 SUCCESS NODE_LOCAL 1 / executor1_machine_host_name 2016/12/28 04:31:41 1 s 18.8 KB / 186
15 330417 0 SUCCESS NODE_LOCAL 2 / executor2_machine_host_name 2016/12/28 04:31:41 1 s 20.4 KB / 213
Try increasing number of partitions equal to number of executor cores,since you are giving 8 executor cores,increase number of partitions on Kafka topic to 8. Also ,check what happens if you do not do re partition.