I am using Reactor and am am creating a flux to which I am publishing some events. My issue is that the subscribers that I create with filters fail after a while unless I add a non filter subscriber on the flux.
import reactor.core.publisher.EmitterProcessor
class PublishSubscribe {
companion object {
@JvmStatic
fun main(args: Array<String>) {
val publisher = EmitterProcessor.create<String>().connect()
writeAndGet(publisher)
writeAndGet(publisher)
writeAndGet(publisher)
}
fun writeAndGet(publisher: EmitterProcessor<String>) {
val result = publisher
.filter { true }
.takeUntil { it == "end" }
.collectList()
.subscribe()
val result2 = publisher
.filter { true }
.takeUntil { it == "end" }
.collectList()
.subscribe()
Thread.sleep(1000)
publisher.onNext("unu")
publisher.onNext("end")
try {
println("X=" + result.blockMillis(3000))
println("Y=" + result2.blockMillis(3000))
} catch (e: Exception) {
e.printStackTrace()
}
println(result.isTerminated)
println(result2.isTerminated)
println("---")
}
}
}
The code works fine if I an extra subscriber.
...
val publisher = EmitterProcessor.create<String>().connect()
publisher.subscribe() //this solves the issue
writeAndGet(publisher)
...
Any ideas about what I am doing wrong?
Best regards
EmitterProcessor.create()creates processor withautoCancelflag set totrue. Which means it gets automatically canceled as soon as all subscribers unsubscribe.What makes your subscribers unsubscribe is not
filterbuttakeUntiloperator.Adding an extra permanent subscriber keeps processor from auto-canceling but that does not seem to be a good solution.
For your test case to work you have to create processor with
EmitterProcessor.create(false). This setsautoCanceltofalse, so you can re-subscribe again and again.