Mongo is missing data updates. I have a spring-kafka listener method where I get event messages in high frequency. I increment the count by 1 whenever I get a message. The purpose is to track how many events I received. The problem is, there seems to be some missed updates in mongo. Say for example I get 40K events in total on the topic ( by counting the number of log statements in the listener method), but the currentCount in mongo is always less than 40K over various tests I have done. The kafkaContainerFactory concurrency is set to 1 and there is only one consumer and 1 partition for the topic. When I ran a simple loop of 40K, the count is correct in mongo. This lead me to believe this has something to do with the kafka listener.
@KafkaListener(topics = "${topic.event}", clientIdPrefix = "${client.event}",
containerFactory = "kafkaContainerFactory")
public void listenForEvent(ConsumerRecord<String, String> cr,
@Payload String message) throws IOException {
log.debug(" Partition {} received key {}: | Payload: {} | Record: {}", cr.partition(), cr.key(), message, cr.toString());
Response response = objectMapper.readValue(message,Response.class);
mongoOperations.updateFirst(new Query(Criteria.where("eventID").is(response.getEventID())),
new Update().inc("currentCount", 1),
Event.class);
}
Reference: https://www.mongodb.com/blog/post/fast-updates-with-mongodb-update-in-place