Coroutines Broadcast Channel/Receive channel eventbus skips first elemet

2k views Asked by At

This is the eventbus implementation using Broadcast channel. (RxJava2 not allowed :( )

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.*
import kotlin.coroutines.CoroutineContext

class EventBus(override val coroutineContext: CoroutineContext
               = Dispatchers.Default) :CoroutineScope {

    @ExperimentalCoroutinesApi
    private val channel = BroadcastChannel<Any>(1)

    @ExperimentalCoroutinesApi
    suspend fun send(event: Any) {
            channel.send(event)
    }

    @ExperimentalCoroutinesApi
    fun subscribe(): ReceiveChannel<Any> =
        channel.openSubscription()

    @ExperimentalCoroutinesApi
    inline fun <reified T> subscribeToEvent() =
        subscribe().let {
            produce<T>(coroutineContext) {
                for (t in it){
                    if(t is T)
                        send(t as T)
                }
            }

        }
}

And here is my testing code.

@Test
fun testEventBus(){
    val bus = EventBus()
    var i = 1;
    var isFinish = false
    CoroutineScope(Dispatchers.IO).launch{
        println("launching_subsc")
        bus.subscribe().consumeEach {
            println("received $it")
            assert(it == i++)
            isFinish = (it == 5)
        }
        withContext(bus.coroutineContext){

        }
    }
    bus.launch {
        delay(500)
        for (j in 1..5) {
            println("sending $j")
            bus.send(j)
            sleep(500)
        }
    }

    while (!isFinish)
        sleep(50)
}

This test works good, but I want to remove the delay(500) and yet expect the test to work. If I remove the delay(500) now, the output is

launching_subsc
sending 1
sending 2
received 2
...

or

sending 1
launching_subsc
sending 2
received 2 
...

My actual need is that in actual project scenario I want the data to be published after the subscription, again, if there is no subscriber, the events need to be dropped. So if there is a subscription call before publishing call(irrespective of Dispatchers) the subscriber must receive all the events.

I have tried using the same scope/dispatcher, nothing is working.

0

There are 0 answers