This is the eventbus implementation using Broadcast channel. (RxJava2 not allowed :( )
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.*
import kotlin.coroutines.CoroutineContext
class EventBus(override val coroutineContext: CoroutineContext
= Dispatchers.Default) :CoroutineScope {
@ExperimentalCoroutinesApi
private val channel = BroadcastChannel<Any>(1)
@ExperimentalCoroutinesApi
suspend fun send(event: Any) {
channel.send(event)
}
@ExperimentalCoroutinesApi
fun subscribe(): ReceiveChannel<Any> =
channel.openSubscription()
@ExperimentalCoroutinesApi
inline fun <reified T> subscribeToEvent() =
subscribe().let {
produce<T>(coroutineContext) {
for (t in it){
if(t is T)
send(t as T)
}
}
}
}
And here is my testing code.
@Test
fun testEventBus(){
val bus = EventBus()
var i = 1;
var isFinish = false
CoroutineScope(Dispatchers.IO).launch{
println("launching_subsc")
bus.subscribe().consumeEach {
println("received $it")
assert(it == i++)
isFinish = (it == 5)
}
withContext(bus.coroutineContext){
}
}
bus.launch {
delay(500)
for (j in 1..5) {
println("sending $j")
bus.send(j)
sleep(500)
}
}
while (!isFinish)
sleep(50)
}
This test works good, but I want to remove the delay(500) and yet expect the test to work. If I remove the delay(500) now, the output is
launching_subsc
sending 1
sending 2
received 2
...
or
sending 1
launching_subsc
sending 2
received 2
...
My actual need is that in actual project scenario I want the data to be published after the subscription, again, if there is no subscriber, the events need to be dropped. So if there is a subscription call before publishing call(irrespective of Dispatchers) the subscriber must receive all the events.
I have tried using the same scope/dispatcher, nothing is working.