Reactor Kafka "Can't signal value due to lack of requests" when using sample() - "Concurrent Processing with Partition-Based Ordering"

39 views Asked by At

We have implemented a couple of Reactor Kafka based Spring Boot WebFlux services. Within the services we use the approach described in "5.12. Concurrent Processing with Partition-Based Ordering" of the Reference Guide.

return reactiveKafkaConsumerTemplateUpdates.receive()
    .retryWhen(consumerProperties.getRetrySpecification().toRetry())
    .groupBy(receiverRecord -> receiverRecord.receiverOffset().topicPartition().partition())
    .flatMap(partitionFlux ->
        partitionFlux
            // perform database updates on another scheduler
            .publishOn(scheduler)
            .concatMap(this::consumeUpdateEvent)
            // commit the processed records according to the defined commit interval
            .sample(consumerProperties.getCommitInterval())
            .concatMap(databaseResult -> databaseResult.receiverOffset().commit())
    );

In the above code, e.g. we materialize Kafka events into a PostgreSQL database, but we have other services using the same groupBy/../sample()/commit approach.

The problem is, that sometimes under heavy load or maybe downtimes of one Broker, we are receiving the error:

ERROR in the main kafka consuming chain:  reactor.core.Exceptions$OverflowException:
Can't signal value due to lack of requests
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:249)
    at reactor.core.publisher.FluxSample$SampleOther.onNext(FluxSample.java:279)
    at de.datev.ediio.common.reactive.mdc.MdcContextLifter.onNext(MdcContextLifter.java:35)
    at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:125)
    at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
    at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

Then the complete chain is dead.

Can't signal value due to lack of requests comes from https://github.com/reactor/reactor-core/blob/main/reactor-core/src/main/java/reactor/core/publisher/FluxSample.java#L279

In one of our services we simply removed the sample() and are using manual commit on a single event base.

But we have a service that has to be fast, so what is wrong with sample() in the strategy, that is part of the Reactor Kafka Reference Guide?

0

There are 0 answers