Can anyone help me out from the below queries.I am using kafka-clients-0.10.1.1(Single Node Single Broker)
Default value of auto.create.topics.enable is true.
1.I am sending message to a topic using
kafkaProdcuer<String,String> producer> producer...
producer.send(new ProducerRecord<String, String>("my- topic","message"));
producer.close();
For Consuming :
kafkaConsumer<String,String> consumer....
consumer.subscribe(Arrays.asList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(200);
while(true){
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
The problem is when I run the consumer for the first time, It does not get values. And I must run the producer and run the consumer again to gets the values. Some times i have to run producer 3 times. Why is this working in this way?
2.) enable.auto.commit=false
Can same consumer read message multiple times if enable.auto.commit property is false?
3.)Considering my consumer code in 1st point.How can I break the loop I mean How can consumer know it has read all messages and then call consumer.close()
1) Are you always using the same group.id in the consumer? Are you producing before consuming? This may be related to consumer groups and offset management. Please see this answer about consumer offset behavior.
2) Not sure if you mean read duplicates intentionally or by accident. You can always read the same message again seeking to that position as long as the message as not been deleted due to topic retention policy. If you mean by accident, auto-commit set to false just means that consumer will not commit offsets for you, you have to do it manually calling commitSync() or commitAsync(). In any case there is still a chance of your consumer processing a message and crashing before committing, in that case when consumer recover it will read those processed-but-not-committed messages again. If you want exactly once semantic you have to do something else, like storing offsets atomically with processed messages.
3) As Lhfcws mentioned, in a stream there is no concept like "all messages". Some things (tricks) that you can do are: