I am storing the data from KafkaListener in a ConcurrentLinkedQueue to be processed. Currently it consumes as many data as it can and completely fills up RAM. How do I limit the number of messages in the queue so that when it reaches the limit the KafkaListener pauses.
ConcurrentLinkedQueue<Message> queue = new ConcurrentLinkedQueue<>();
@KafkaListener(
topics = "topic",
id = "topic-kafka-listener",
groupId = "batch-processor",
containerFactory = "kafkaListenerContainerFactory"
)
public void receive(@NotNull @Payload List<Message> messages) {
queue.addAll(messages);
}
How do I limit the queue size to say 1 million?
Whenever the queue is polled and there is free space it should start listening again.
OR
How do I limit the rate at which Kafka consumes messages to say 100,000 messages per second?
Instead of using annotation I used the
KafkaConsumerobject to poll for data manually. With this there is more control.