Limit number of concurrently processed messages with the Spring Cloud Stream Kafka Reactive binder

52 views Asked by At

Currently we are migrating our applications to the Spring Cloud Stream Kafka Reactive binder and experience very high memory usage. An incoming message causes all kind of processing here so we'd like to reduce the number of messages that are offered to the Function<Flux<In>, Flux<Out>> by Spring Cloud Stream.

Another issue is that we are reading and sending messages to two different Kafka clusters. The one that is receiving the messages is accepting new messages slower than we could read them from our other cluster.

As stated in the official documentation

5.2.4. Non-blocking back-pressure The number of in-flight sends can be controlled using the maxInFlight option. Requests for more elements from upstream are limited by the configured maxInFlight to ensure that the total number of requests at any time for which responses are pending are limited. Along with buffer.memory and max.block.ms options on KafkaProducer, maxInFlight enables control of memory and thread usage when KafkaSender is used in a reactive pipeline. This option can be configured on SenderOptions before the KafkaSender is created. Default value is 256. For small messages, a higher value will improve throughput.

So we thought setting the max in flight requests via SenderOptionsCustomizer would signal how much messages should be consumed upstream to the KafkaReceiver. Apparently this does not seem to work as we thought.

What's the proper way to limit memory consumption using the Spring Cloud Stream Kafka Reactive binder?

0

There are 0 answers