kafka Client Api questions

289 views Asked by At

Can anyone help me out from the below queries.I am using kafka-clients-0.10.1.1(Single Node Single Broker)

Default value of auto.create.topics.enable is true.

1.I am sending message to a topic using

    kafkaProdcuer<String,String> producer> producer...
    producer.send(new ProducerRecord<String, String>("my- topic","message"));
    producer.close();

For Consuming :

    kafkaConsumer<String,String> consumer....
    consumer.subscribe(Arrays.asList("my-topic"));
    ConsumerRecords<String, String> records = consumer.poll(200);

    while(true){
     for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
         }
     }

The problem is when I run the consumer for the first time, It does not get values. And I must run the producer and run the consumer again to gets the values. Some times i have to run producer 3 times. Why is this working in this way?

2.) enable.auto.commit=false

Can same consumer read message multiple times if enable.auto.commit property is false?

3.)Considering my consumer code in 1st point.How can I break the loop I mean How can consumer know it has read all messages and then call consumer.close()

1

There are 1 answers

4
Luciano Afranllie On BEST ANSWER

1) Are you always using the same group.id in the consumer? Are you producing before consuming? This may be related to consumer groups and offset management. Please see this answer about consumer offset behavior.

2) Not sure if you mean read duplicates intentionally or by accident. You can always read the same message again seeking to that position as long as the message as not been deleted due to topic retention policy. If you mean by accident, auto-commit set to false just means that consumer will not commit offsets for you, you have to do it manually calling commitSync() or commitAsync(). In any case there is still a chance of your consumer processing a message and crashing before committing, in that case when consumer recover it will read those processed-but-not-committed messages again. If you want exactly once semantic you have to do something else, like storing offsets atomically with processed messages.

3) As Lhfcws mentioned, in a stream there is no concept like "all messages". Some things (tricks) that you can do are:

  • You can check if record list returned by poll if empty and and after some configured number of times break the loop and exit.
  • If messages are ordered (you are reading from a single partition) you can send a kind of special END_OF_DATA message, when you see it, you close the consumer.
  • You can make the consumer read a number of messages and then exit, next time it will continue from last committed offset.