I have a library that produces values from reading data from different sources and offers them to a flow from which the user of the library can collect
to do so I use this code
private val dataChannel = BroadcastChannel<Reading>(10)
val dataFlow get() = dataChannel.asFlow()
//this may be triggered by a 5 different threads
fun newReading(type:String, value:Float, time:Date, level:Int) {
dataChannel.offer(Reading(type,value,time,Level.fromValue(level))
}
the user can do something like this to get the data
lastJob?.cancel()
lastJob = launch {
lib.dataFlow.flowOn(Dispatchers.Default).collect { reading ->
val result = processReading(reading)
withContext(Dispatchers.IO) {
Toast.makeText(application.applicationContext, result,Toast.LENGTH_LONG).show()
}
}
}
this works as expected, when the user launches the job my library may keep on sending data and the user of the library will keep on receiving them
however if the user cancels lastJob
and then after a few seconds launches it again they will loose the data received by my library between those two times
is there away to make dataChannel
store all the readings received through offer
in that timespan (up to the buffer level of 10 set in its constructor) and when the dataFlow
becomes active again to emit those values at the same time?
You can try out some of the following options:
BroadcastChannel<String>(Channel.BUFFERED).asFlow()
, this will initialise your channel with the capacity of theDEFAULT_BUFFER_PROPERTY_NAME
, which is 64 by default, but in can be overridden on JVM.Channel(Channel.UNLIMITED)
thenconsumeAsFlow()
, but yeah, having an unlimited buffer is generally a bad practiceChannel(Channel.RENDEZVOUS)
, and usesend()
instead ofoffer()
. Rendezvous channels are basically channels with 0 capacity and whenever your values are not consumed/collected, it will suspend thesend()
call.