In reactor, when I have a quick producer but a slow consumer, and the values in the Reactor stream is like a "snapshot", I would like the consumer to process the latest value in the stream and drop the others. (For example, a consumer that shows the exchange value in GUI vs. a producer that converts the exchange ticks to a Flux.)
The Flux#onBackpressureLatest() operator seems to be the right way to go.
I did some Googling and find some usage examples:
Flux.range(1, 30)
.delayElements(Duration.ofMillis(500))
.onBackpressureLatest()
.delayElements(Duration.ofMillis(3000))
.subscribe { println("got $it") }
This puts a manual delay after onBackpressureLatest(). It's more like a Flux#sample(Duration) rather than a slow consumer.
Internally, the delayElements(Duration) operator wraps a concatMap, so I converted this into:
Flux.range(1, 30)
.delayElements(Duration.ofMillis(500))
.onBackpressureLatest()
.concatMap { Mono.just(it).subscribeOn(Schedulers.boundedElastic()) }
.subscribe { item ->
println("got $item")
// simulate slow subscriber with sleep
Thread.sleep(3000)
}
This is like the answers provided in question Latest overflow strategy with size 1 or any alternatives. However, it looks a bit wired. I don't understand why we need the concatMap(op) or flatMap(op, 1, 1) call to get the onBackpressureLatest() working.
I tried the following (simplified) versions but they do not work as expected, why?
// not working try - 1
Flux.range(1, 30)
.delayElements(Duration.ofMillis(500))
.onBackpressureLatest()
.publishOn(Schedulers.boundedElastic())
.subscribe { item ->
println("got $item")
// simulate slow subscriber with sleep
Thread.sleep(3000)
}
// not working try - 2
Flux.range(1, 30)
.delayElements(Duration.ofMillis(500))
.onBackpressureLatest()
.publishOn(Schedulers.boundedElastic())
.subscribe(object : BaseSubscriber<Int>() {
override fun hookOnSubscribe(subscription: Subscription) {
// explicitly request 1
subscription.request(1)
}
override fun hookOnNext(value: Int) {
// simulate slow subscriber with sleep
Thread.sleep(3000)
println("got $value")
// explicitly request 1
request(1)
}
})
To answer my own question
When the consumer is slow and producer is fast, they need to run on different scheduler threads, otherwise if they run in the same thread, the whole Flux chain will be in sync mode. If in that case, the consumer and producer will run in the same pace in a single thread. So the following won't work
So we need to switch scheduler thread after the producer to make sure consumer runs in a different thread.
If we switch the scheduler before
.onBackpressureLatest()with.publishOn, the rest of the operator chain will run in the same thread. It's like we just started another thread, and runs the synchronous flux flow there, which is merely the same as the above case, so the following doesn't work.If we put the
.publishOn(Schedulers.boundedElastic())after.onBackpressureLatest(), it doesn't work either. The reason is that the 1-argpublishOnmethod takes a default prefetch valueQueues.SMALL_BUFFER_SIZE = 256. So it willrequest(256)on subscription, which pushes a pressure to.onBackpressureLatest()that the downstream needs 256 items. Thus.onBackpressureLatest()will offer 256 values directly to.publishOn(if available), and the chain after.publishOnconsumes the items synchronously. So the following doesn't work as expected:So what we need is to make sure the operator chain after
.onBackpressureLatest()pushes a pressure of1when they are ready to process the next item, i.e., in the speed of the consumer. We just need to call.publishOnwith the secondprefetchargument:The following two alternatives described in the question can be used to replace the
.publishOnline. They do the same: 1) switch scheduler thread and 2) ensures back pressure of 1 toonBackpressureLatest()..concatMap { Mono.just(it).subscribeOn(Schedulers.boundedElastic()) }.flatMap({ Mono.just(it).subscribeOn(Schedulers.boundedElastic()) }, 1, 1)To rewrite this using
BaseSubscriber<T>, we can write