I have a library that produces values from reading data from different sources and offers them to a flow from which the user of the library can collect

to do so I use this code

private val dataChannel = BroadcastChannel<Reading>(10) 

val dataFlow get() = dataChannel.asFlow()

//this may be triggered by a 5 different threads
fun newReading(type:String, value:Float, time:Date, level:Int) {
    dataChannel.offer(Reading(type,value,time,Level.fromValue(level))
}

the user can do something like this to get the data

lastJob?.cancel()
lastJob = launch {
    lib.dataFlow.flowOn(Dispatchers.Default).collect { reading ->
       val result = processReading(reading)
       withContext(Dispatchers.IO) {
           Toast.makeText(application.applicationContext, result,Toast.LENGTH_LONG).show()
       }
    }
}

this works as expected, when the user launches the job my library may keep on sending data and the user of the library will keep on receiving them

however if the user cancels lastJob and then after a few seconds launches it again they will loose the data received by my library between those two times

is there away to make dataChannel store all the readings received through offer in that timespan (up to the buffer level of 10 set in its constructor) and when the dataFlow becomes active again to emit those values at the same time?

1

There are 1 answers

4
Róbert Nagy On

You can try out some of the following options:

  1. BroadcastChannel<String>(Channel.BUFFERED).asFlow(), this will initialise your channel with the capacity of the DEFAULT_BUFFER_PROPERTY_NAME, which is 64 by default, but in can be overridden on JVM.
  2. You can create a Channel(Channel.UNLIMITED) then consumeAsFlow(), but yeah, having an unlimited buffer is generally a bad practice
  3. Create a Channel(Channel.RENDEZVOUS), and use send() instead of offer(). Rendezvous channels are basically channels with 0 capacity and whenever your values are not consumed/collected, it will suspend the send() call.