Kafka Node High Level Producer writes to even partitions only

978 views Asked by At

I am using the Kafka Node library, and testing the high level producer.

I've created a topic with 10 partitions, 'HLPTestInput', and written a function to produce to it every second.

The producer writes to partitions 0,2,4,6 and 8, but not to the odd ones.

Strangely, when I consume from this topic and produce to a second topic, 'HLPTestInputFromConsumer', which has 5 partitions, messages are written to all of them.

Is there a configuration I'm missing?

const kafka = require('kafka-node'),
    HighLevelProducer = kafka.HighLevelProducer,
    ConsumerGroup = kafka.ConsumerGroup,
    client = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
    producer = new HighLevelProducer(client),
    consumer = new ConsumerGroup(
        {
          kafkaHost: 'smc-dev.silverbolt.lab:9092',
            groupId: 'testGroup'
        },
        'HLPTestInput'
    );

let index = 0;
setInterval(() => {
    producer.send([{
        topic: 'HLPTestInput',
        messages: [index]
    }], (err, data) => {
        console.log('produced', data);
    });
    index++;
}, 1000);

consumer.on('message', (message) => {
    console.log('consumed', message);
    producer.send([{
        topic: 'HLPTestInputFromConsumer',
        messages: [message]
    }], (err, data) => {
        console.log('produced to secondary', data);
    });
});
1

There are 1 answers

0
Anuresh Verma On

I am not so sure but it may be because of you using the same producer to write on two different topics. As HighLevelProducer uses round-robin to write. So suppose your producer writes in "HLPTestInput" Topic then you set the time interval for 1000 so in this meantime, your consumer gets the message and Now your producer writes in "HLPTestInputFromConsumer" Topic.

So your producer writes "HLPTestInput" topic in its partitions 0,2,4...

and "HLPTestInputFromConsumer" topic in its partion 1,3,5 ...

So I will suggest just try to create another producer. Then it should work fine.

try below code:

const kafka = require('kafka-node'),
    HighLevelProducer = kafka.HighLevelProducer,
    ConsumerGroup = kafka.ConsumerGroup,
    client = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
    client1 = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
    producer = new HighLevelProducer(client),
    producer1 = new HighLevelProducer(client1),
    consumer = new ConsumerGroup(
       {
          kafkaHost: 'smc-dev.silverbolt.lab:9092',
           groupId: 'testGroup'
        },
        'HLPTestInput'
    );
let index = 0;
    setInterval(() => {
    producer.send([{
        topic: 'HLPTestInput',
        messages: [index]
    }], (err, data) => {
        console.log('produced', data);
    });
   index++;
}, 1000);

consumer.on('message', (message) => {
    console.log('consumed', message);
    producer1.send([{
        topic: 'HLPTestInputFromConsumer',
        messages: [message]
    }], (err, data) => {
        console.log('produced to secondary', data);
    });
});