In Kotlin Coroutines, how do one verify if and when any flow backpressure occurs, that is, if I have a flow that is using buffer(), how do I check and log each time the producer and/or the consumer suspend themselves due to backpressure?
I would like to do this to fine-tune the flow buffer size, and also the parallelism from the consumer side. For the sake of example, I'll provide a simple example:
suspend fun main() {
val bufferedFlow = createFlow().buffer()
// Here, the collector is much slower than the producer
bufferedFlow.collect {
delay(100)
println("Collecting: $it")
}
}
// Produces numbers at a fast pace
fun createFlow(): Flow<Int> = (0..Integer.MAX_VALUE).asFlow().onEach {
delay(10)
println("Emitting: $it")
}
In this example, the consumer is overwhelmed by the producer, but since the producer supports backpressure, it is suspended until the consumer consumes an item from the buffer. Of course, in this example, it's obvious which side is fast and which side is slow, but there are scenarios where things aren't that clear.
If one would like to detect and log each time the producer or the consumer suspends due to backpressure from the other side, how could he do this?
I also posted this same exact question on Kotlin Slack, and got a fascinating answer from Zach Klippenstein of how we can detect backpressure.
First, we write a method that can detect whenever a method inside its lambda suspends.
Then, with this method, we can create another method that wraps the flow producer to find out each time it suspends itself.
And with
onBackpressuremethod we have everything we need to write a method that performs an arbitrary action each time the flow producer suspends. For the sake of having an example, here's a method that wraps abuffer()(without giving up on any of its features) and logs if and how long the producer has been suspended since the last emission.And with this code we are able to backpressure on the flow, thus this is the answer for my own question.