Kafka RoundRobin partitioner not distributing messages to all the partitions

3.3k views Asked by At

I am trying to use Kafka's RoundRobinPartitioner class for distributing messages evenly across all the partitions. My Kafka topic configuration is as follows:

name: multischemakafkatopicodd

number of partitions: 16

replication factor: 2

Say, if I am producing 100 messages then each partition should have 6 or 7 messages. But, I am getting something similar to this:

sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:26
multischemakafkatopicodd:5:0
multischemakafkatopicodd:10:24
multischemakafkatopicodd:15:0
multischemakafkatopicodd:13:0
multischemakafkatopicodd:8:26
multischemakafkatopicodd:2:26
multischemakafkatopicodd:12:24
multischemakafkatopicodd:14:24
multischemakafkatopicodd:9:0
multischemakafkatopicodd:11:0
multischemakafkatopicodd:4:26
multischemakafkatopicodd:1:0
multischemakafkatopicodd:6:24
multischemakafkatopicodd:7:0
multischemakafkatopicodd:3:0

I thought that may be I am not producing enough messages, so I tried with 1M records and set the number of partitions to an odd number:

topic: multischemakafkatopicodd

number of partitions: 31

replication factor: 2

...and I got this. This time the number of messages in each partition is somewhat evenly distributed.

sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:33845
multischemakafkatopicodd:5:34388
multischemakafkatopicodd:10:33837
multischemakafkatopicodd:20:33819
multischemakafkatopicodd:15:33890
multischemakafkatopicodd:25:34414
multischemakafkatopicodd:30:33862
multischemakafkatopicodd:26:34066
multischemakafkatopicodd:9:34088
multischemakafkatopicodd:11:34124
multischemakafkatopicodd:16:33802
multischemakafkatopicodd:4:34061
multischemakafkatopicodd:17:34977
multischemakafkatopicodd:3:34084
multischemakafkatopicodd:24:33849
multischemakafkatopicodd:23:34111
multischemakafkatopicodd:13:34062
multischemakafkatopicodd:28:33876
multischemakafkatopicodd:18:34098
multischemakafkatopicodd:22:34058
multischemakafkatopicodd:8:34079
multischemakafkatopicodd:2:33839
multischemakafkatopicodd:12:34075
multischemakafkatopicodd:29:34132
multischemakafkatopicodd:19:33924
multischemakafkatopicodd:14:34109
multischemakafkatopicodd:1:34088
multischemakafkatopicodd:6:33832
multischemakafkatopicodd:7:34080
multischemakafkatopicodd:27:34188
multischemakafkatopicodd:21:34684

Again I did the same test but decreased the number of partitions to 8 and I got this result where we can clearly see that some partitions have close to 15K messages while others have around 10K:

multischemakafkatopicodd:0:155927
multischemakafkatopicodd:5:105351
multischemakafkatopicodd:1:107382
multischemakafkatopicodd:4:160533
multischemakafkatopicodd:6:158007
multischemakafkatopicodd:7:105608
multischemakafkatopicodd:2:157934
multischemakafkatopicodd:3:105599

Am I doing anything wrong or is this how it is supposed to work? Why is there such an unequal distribution of messages?

If anyone can help me out, that would be great. Thanks.

1

There are 1 answers

1
aran On BEST ANSWER

From my understanding, the partitioner is working well. But you must be aware of the optimizations made by the producer in order to maximize performance:

  • The producer won't produce each message to a different partition for every send call, as it would be overkill.

  • Round-Robin guarantees a similar distribution, but works sending batches. This means, it will buffer an amount of messages destined to a partition, based on the remainder (not modulus!) operation made in the RoundRobinPartitioner's code:

     int part = Utils.toPositive(nextValue) % availablePartitions.size();
    

nextValue is an AtomicInteger that increments by 1 for each partition/send call. Thus, the remainder will always increment by one (in a cyclic manner, for example with 4 partitions: 0-1-2-3-0-1-2-3-...) as well, assuming no partition is declared non-avaliable during the process. If that happens, the cycle could look like 0-1-2-(partition4fails)-0-1-2-(partition4OK)-3-0-...


Example

  • Topic with 4 partitions
  • The producer partitioner thread's buffer for each partition holds 3 messages

(The message number counter starts with 0 - new AtomicInteger(0))

    MsgN % Partitions   Partition
        0%4                0
        1%4                1
        2%4                2
        3%4                3
        4%4                0
        5%4                1
        6%4                2 
        7%4                3
        8%4                0
        ...               ...

When the 9th message is produced, the buffer for the first partition is fulfilled (as it already holds 3 messages) and hence ready to be sent to kafka. If you stop the process right there, the 4 partitions would look like this:

    Partition    Offset
       0           3
       1           0
       2           0
       3           0

When producing the 10th message, the buffer for the second partition will also be ready to be sent out of the wire, and the topic would look like:

    Partition    Offset
       0           3
       1           3
       2           0
       3           0

In real life, the buffer usually holds a big amount of messages (this can also be tunned). Let's say, for example, 1000 messages stored. For the same scenario, the partitions would look like:

    Partition    Offset
       0           1000
       1           1000
       2           0
       3           0

Hence incrementing the "visual" difference between partitions. It will be more notorious the bigger batch size / buffer size are.

This is related to the nature of the producer's partitioner thread itself: by default, it won't send each message independently, but store them in order to send multiple messages at each every broker call, optimizing the system's performance.

Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request

This unbalancement could be more notorious if the producer is stopped/started, as it will restart the mechanism regardless of the previously selected partitions (so it could start sending to the same partition that was elected just before being stopped, hence incrementing the difference with other non-elected partitions from the last execution).

In a new execution, the buffers will all be empty, so the process will restart regardless of which partitions received most.

So, you stop the process here:

    Partition    Offset
       0           1000
       1           1000
       2           0
       3           0

The map that holds the counter of number of messages for each topic is restarted, as it's not part of the broker, but of the Partitioner class from the producer. If the producer is not closed properly and/or flushed, those cached messages also will be lost. So, in this scenario, what you get is a repeat of the previous logic:

    MsgN % Partitions   Partition
        0%4                0
        1%4                1
        2%4                2
        3%4                3
                 (...)

That will lead to this in certain moment:

    Partition    Offset
       0          2000
       1          2000
       2           0
       3           0

It's an unbalancement produced by the non-continous execution of the sending process, but it's out of bounds for the RoundRobinPartitioner, whose nature is based on a continous process (non-stop).

You can verify this behaviour by checking each partition's offset while sending the messages: Only when the selected partition stores n messages, the next elected partition will get its batch of n messages.

Note: The numbers shown on the examples reference a "perfect" scenario; In real life, messages can also be revoked, compacted, failed, flushed regardless of the buffer size, partitions not avaliable,... leading to offset numbers such as shown in your question.

Last example with a flush scenario:

    Partition    Offset
       0           1000
       1           1000
       2           0
       3           0

The process is stopped but the producer is correctly closed and flushes its messages, so the topic looks like:

    Partition    Offset
       0           1997
       1           1996
       2           999
       3           998

The process is restarted. After flushing the first partition's buffer, would look like:

    Partition    Offset
       0           2997
       1           1996
       2           999
       3           998

Hence incrementing the confussion regarding the mechanism's "equity". But it's not its fault, as there's no persistence within the partitioner's map, counter, and buffers. If you let the process to be executed for days with no stop, you'll find that it really balances the messages in a "near-equal" way.


RoundRobinPartitioner's relevant methods:

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, 
                     byte[] valueBytes, Cluster cluster) 
{
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    int nextValue = nextValue(topic);
    List<PartitionInfo> availablePartitions=cluster.availablePartitionsForTopic(topic);
    if (!availablePartitions.isEmpty()) { 
        /*remainder calculus in order to select next partition*/
        int part = Utils.toPositive(nextValue) % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        // no partitions are available, give a non-available partition
        return Utils.toPositive(nextValue) % numPartitions;
    }
}

private int nextValue(String topic) 
{
  /*Counter of num messages sent. topicCounterMap is part of the producer 
   process, hence not persisted by default.
   It will start by 0 for every topic with each new launch*/
   AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
       return new AtomicInteger(0); });
   return counter.getAndIncrement();
}