Reactor - Flux requires non filter subscriber

902 views Asked by At

I am using Reactor and am am creating a flux to which I am publishing some events. My issue is that the subscribers that I create with filters fail after a while unless I add a non filter subscriber on the flux.

import reactor.core.publisher.EmitterProcessor

class PublishSubscribe {

companion object {
    @JvmStatic
    fun main(args: Array<String>) {
        val publisher = EmitterProcessor.create<String>().connect()
        writeAndGet(publisher)
        writeAndGet(publisher)
        writeAndGet(publisher)

    }

    fun writeAndGet(publisher: EmitterProcessor<String>) {
        val result = publisher
                .filter { true }
                .takeUntil { it == "end" }
                .collectList()
                .subscribe()

        val result2 = publisher
                .filter { true }
                .takeUntil { it == "end" }
                .collectList()
                .subscribe()


        Thread.sleep(1000)

        publisher.onNext("unu")
        publisher.onNext("end")

        try {

            println("X=" + result.blockMillis(3000))
            println("Y=" + result2.blockMillis(3000))

        } catch (e: Exception) {
            e.printStackTrace()
        }
        println(result.isTerminated)
        println(result2.isTerminated)
        println("---")

    }

}

}

The code works fine if I an extra subscriber.

...
val publisher = EmitterProcessor.create<String>().connect()
publisher.subscribe()  //this solves the issue
writeAndGet(publisher)
...

Any ideas about what I am doing wrong?

Best regards

1

There are 1 answers

1
Yaroslav Stavnichiy On BEST ANSWER

EmitterProcessor.create() creates processor with autoCancel flag set to true. Which means it gets automatically canceled as soon as all subscribers unsubscribe.

What makes your subscribers unsubscribe is not filter but takeUntil operator.

Adding an extra permanent subscriber keeps processor from auto-canceling but that does not seem to be a good solution.

For your test case to work you have to create processor with EmitterProcessor.create(false). This sets autoCancel to false, so you can re-subscribe again and again.