We have implemented a couple of Reactor Kafka based Spring Boot WebFlux services. Within the services we use the approach described in "5.12. Concurrent Processing with Partition-Based Ordering" of the Reference Guide.
return reactiveKafkaConsumerTemplateUpdates.receive()
.retryWhen(consumerProperties.getRetrySpecification().toRetry())
.groupBy(receiverRecord -> receiverRecord.receiverOffset().topicPartition().partition())
.flatMap(partitionFlux ->
partitionFlux
// perform database updates on another scheduler
.publishOn(scheduler)
.concatMap(this::consumeUpdateEvent)
// commit the processed records according to the defined commit interval
.sample(consumerProperties.getCommitInterval())
.concatMap(databaseResult -> databaseResult.receiverOffset().commit())
);
In the above code, e.g. we materialize Kafka events into a PostgreSQL database, but we have other services using the same groupBy/../sample()/commit approach.
The problem is, that sometimes under heavy load or maybe downtimes of one Broker, we are receiving the error:
ERROR in the main kafka consuming chain: reactor.core.Exceptions$OverflowException:
Can't signal value due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:249)
at reactor.core.publisher.FluxSample$SampleOther.onNext(FluxSample.java:279)
at de.datev.ediio.common.reactive.mdc.MdcContextLifter.onNext(MdcContextLifter.java:35)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:125)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Then the complete chain is dead.
Can't signal value due to lack of requests comes from https://github.com/reactor/reactor-core/blob/main/reactor-core/src/main/java/reactor/core/publisher/FluxSample.java#L279
In one of our services we simply removed the sample() and are using manual commit on a single event base.
But we have a service that has to be fast, so what is wrong with sample() in the strategy, that is part of the Reactor Kafka Reference Guide?