I am trying to use Kafka's RoundRobinPartitioner class for distributing messages evenly across all the partitions. My Kafka topic configuration is as follows:
name: multischemakafkatopicodd
number of partitions: 16
replication factor: 2
Say, if I am producing 100 messages then each partition should have 6 or 7 messages. But, I am getting something similar to this:
sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:26
multischemakafkatopicodd:5:0
multischemakafkatopicodd:10:24
multischemakafkatopicodd:15:0
multischemakafkatopicodd:13:0
multischemakafkatopicodd:8:26
multischemakafkatopicodd:2:26
multischemakafkatopicodd:12:24
multischemakafkatopicodd:14:24
multischemakafkatopicodd:9:0
multischemakafkatopicodd:11:0
multischemakafkatopicodd:4:26
multischemakafkatopicodd:1:0
multischemakafkatopicodd:6:24
multischemakafkatopicodd:7:0
multischemakafkatopicodd:3:0
I thought that may be I am not producing enough messages, so I tried with 1M records and set the number of partitions to an odd number:
topic: multischemakafkatopicodd
number of partitions: 31
replication factor: 2
...and I got this. This time the number of messages in each partition is somewhat evenly distributed.
sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:33845
multischemakafkatopicodd:5:34388
multischemakafkatopicodd:10:33837
multischemakafkatopicodd:20:33819
multischemakafkatopicodd:15:33890
multischemakafkatopicodd:25:34414
multischemakafkatopicodd:30:33862
multischemakafkatopicodd:26:34066
multischemakafkatopicodd:9:34088
multischemakafkatopicodd:11:34124
multischemakafkatopicodd:16:33802
multischemakafkatopicodd:4:34061
multischemakafkatopicodd:17:34977
multischemakafkatopicodd:3:34084
multischemakafkatopicodd:24:33849
multischemakafkatopicodd:23:34111
multischemakafkatopicodd:13:34062
multischemakafkatopicodd:28:33876
multischemakafkatopicodd:18:34098
multischemakafkatopicodd:22:34058
multischemakafkatopicodd:8:34079
multischemakafkatopicodd:2:33839
multischemakafkatopicodd:12:34075
multischemakafkatopicodd:29:34132
multischemakafkatopicodd:19:33924
multischemakafkatopicodd:14:34109
multischemakafkatopicodd:1:34088
multischemakafkatopicodd:6:33832
multischemakafkatopicodd:7:34080
multischemakafkatopicodd:27:34188
multischemakafkatopicodd:21:34684
Again I did the same test but decreased the number of partitions to 8 and I got this result where we can clearly see that some partitions have close to 15K messages while others have around 10K:
multischemakafkatopicodd:0:155927
multischemakafkatopicodd:5:105351
multischemakafkatopicodd:1:107382
multischemakafkatopicodd:4:160533
multischemakafkatopicodd:6:158007
multischemakafkatopicodd:7:105608
multischemakafkatopicodd:2:157934
multischemakafkatopicodd:3:105599
Am I doing anything wrong or is this how it is supposed to work? Why is there such an unequal distribution of messages?
If anyone can help me out, that would be great. Thanks.
From my understanding, the partitioner is working well. But you must be aware of the optimizations made by the producer in order to maximize performance:
The producer won't produce each message to a different partition for every send call, as it would be overkill.
Round-Robin
guarantees a similar distribution, but works sending batches. This means, it will buffer an amount of messages destined to a partition, based on theremainder
(not modulus!) operation made in theRoundRobinPartitioner
's code:nextValue
is anAtomicInteger
that increments by 1 for each partition/send call. Thus, the remainder will always increment by one (in a cyclic manner, for example with 4 partitions:0-1-2-3-0-1-2-3-...
) as well, assuming no partition is declared non-avaliable during the process. If that happens, the cycle could look like0-1-2-(partition4fails)-0-1-2-(partition4OK)-3-0-...
Example
(The message number counter starts with 0 -
new AtomicInteger(0)
)When the 9th message is produced, the buffer for the first partition is fulfilled (as it already holds 3 messages) and hence ready to be sent to kafka. If you stop the process right there, the 4 partitions would look like this:
When producing the 10th message, the buffer for the second partition will also be ready to be sent out of the wire, and the topic would look like:
In real life, the buffer usually holds a big amount of messages (this can also be tunned). Let's say, for example, 1000 messages stored. For the same scenario, the partitions would look like:
Hence incrementing the "visual" difference between partitions. It will be more notorious the bigger batch size / buffer size are.
This is related to the nature of the producer's
partitioner
thread itself: by default, it won't send each message independently, but store them in order to send multiple messages at each every broker call, optimizing the system's performance.This unbalancement could be more notorious if the producer is stopped/started, as it will restart the mechanism regardless of the previously selected partitions (so it could start sending to the same partition that was elected just before being stopped, hence incrementing the difference with other non-elected partitions from the last execution).
In a new execution, the buffers will all be empty, so the process will restart regardless of which partitions received most.
So, you stop the process here:
The map that holds the counter of number of messages for each topic is restarted, as it's not part of the broker, but of the Partitioner class from the producer. If the producer is not closed properly and/or flushed, those cached messages also will be lost. So, in this scenario, what you get is a repeat of the previous logic:
That will lead to this in certain moment:
It's an unbalancement produced by the non-continous execution of the sending process, but it's out of bounds for the
RoundRobinPartitioner
, whose nature is based on a continous process (non-stop).You can verify this behaviour by checking each partition's offset while sending the messages: Only when the selected partition stores n messages, the next elected partition will get its batch of n messages.
Note: The numbers shown on the examples reference a "perfect" scenario; In real life, messages can also be revoked, compacted, failed, flushed regardless of the buffer size, partitions not avaliable,... leading to offset numbers such as shown in your question.
Last example with a flush scenario:
The process is stopped but the producer is correctly closed and flushes its messages, so the topic looks like:
The process is restarted. After flushing the first partition's buffer, would look like:
Hence incrementing the confussion regarding the mechanism's "equity". But it's not its fault, as there's no persistence within the partitioner's map, counter, and buffers. If you let the process to be executed for days with no stop, you'll find that it really balances the messages in a "near-equal" way.
RoundRobinPartitioner
's relevant methods: