I am using Reactor and am am creating a flux to which I am publishing some events. My issue is that the subscribers that I create with filters fail after a while unless I add a non filter subscriber on the flux.
import reactor.core.publisher.EmitterProcessor
class PublishSubscribe {
companion object {
@JvmStatic
fun main(args: Array<String>) {
val publisher = EmitterProcessor.create<String>().connect()
writeAndGet(publisher)
writeAndGet(publisher)
writeAndGet(publisher)
}
fun writeAndGet(publisher: EmitterProcessor<String>) {
val result = publisher
.filter { true }
.takeUntil { it == "end" }
.collectList()
.subscribe()
val result2 = publisher
.filter { true }
.takeUntil { it == "end" }
.collectList()
.subscribe()
Thread.sleep(1000)
publisher.onNext("unu")
publisher.onNext("end")
try {
println("X=" + result.blockMillis(3000))
println("Y=" + result2.blockMillis(3000))
} catch (e: Exception) {
e.printStackTrace()
}
println(result.isTerminated)
println(result2.isTerminated)
println("---")
}
}
}
The code works fine if I an extra subscriber.
...
val publisher = EmitterProcessor.create<String>().connect()
publisher.subscribe() //this solves the issue
writeAndGet(publisher)
...
Any ideas about what I am doing wrong?
Best regards
EmitterProcessor.create()
creates processor withautoCancel
flag set totrue
. Which means it gets automatically canceled as soon as all subscribers unsubscribe.What makes your subscribers unsubscribe is not
filter
buttakeUntil
operator.Adding an extra permanent subscriber keeps processor from auto-canceling but that does not seem to be a good solution.
For your test case to work you have to create processor with
EmitterProcessor.create(false)
. This setsautoCancel
tofalse
, so you can re-subscribe again and again.