How to detect and log whenever backpressure occurs when using Flows with Kotlin Coroutines

50 views Asked by At

In Kotlin Coroutines, how do one verify if and when any flow backpressure occurs, that is, if I have a flow that is using buffer(), how do I check and log each time the producer and/or the consumer suspend themselves due to backpressure?

I would like to do this to fine-tune the flow buffer size, and also the parallelism from the consumer side. For the sake of example, I'll provide a simple example:

suspend fun main() {
    val bufferedFlow = createFlow().buffer()

    // Here, the collector is much slower than the producer
    bufferedFlow.collect {
        delay(100)
        println("Collecting: $it")
    }
}

// Produces numbers at a fast pace
fun createFlow(): Flow<Int> = (0..Integer.MAX_VALUE).asFlow().onEach {
    delay(10)
    println("Emitting: $it")
}

In this example, the consumer is overwhelmed by the producer, but since the producer supports backpressure, it is suspended until the consumer consumes an item from the buffer. Of course, in this example, it's obvious which side is fast and which side is slow, but there are scenarios where things aren't that clear.

If one would like to detect and log each time the producer or the consumer suspends due to backpressure from the other side, how could he do this?

2

There are 2 answers

0
SecretX On BEST ANSWER

I also posted this same exact question on Kotlin Slack, and got a fascinating answer from Zach Klippenstein of how we can detect backpressure.

First, we write a method that can detect whenever a method inside its lambda suspends.

suspend inline fun <T> onSuspended(
    crossinline onSuspended: () -> Unit,
    noinline block: suspend () -> T
): T = suspendCoroutineUninterceptedOrReturn { cont ->
    val result = block.startCoroutineUninterceptedOrReturn(cont)
    if (result == COROUTINE_SUSPENDED) {
        onSuspended()
    }
    result
}

Then, with this method, we can create another method that wraps the flow producer to find out each time it suspends itself.

fun <T> Flow<T>.onBackpressure(
    onBackpressure: () -> Unit
): Flow<T> = flow {
    collect {
        onSuspended(onBackpressure) {
            emit(it)
        }
    }
}

And with onBackpressure method we have everything we need to write a method that performs an arbitrary action each time the flow producer suspends. For the sake of having an example, here's a method that wraps a buffer() (without giving up on any of its features) and logs if and how long the producer has been suspended since the last emission.

suspend fun main() {
    val bufferedFlow = createFlow().bufferLoggingBackpressure()

    // Here, the collector is much slower than the producer
    bufferedFlow.collect {
        delay(100)
        println("Collecting: $it")
    }
}

// Produces numbers at a fast pace
fun createFlow(): Flow<Int> = (0..Integer.MAX_VALUE).asFlow().onEach {
    delay(10)
    println("Emitting: $it")
}

fun <T> Flow<T>.bufferLoggingBackpressure(
    capacity: Int = Channel.BUFFERED,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
): Flow<T> {
    var suspended = false
    var lastSuspension = 0L

    return onBackpressure {
        suspended = true
        lastSuspension = System.nanoTime()
    }.buffer(capacity = capacity, onBufferOverflow = onBufferOverflow).onEach {
        if (suspended) {
            println("Backpressure detected, suspended for ${(System.nanoTime() - lastSuspension).nanoseconds.inWholeMicroseconds}us")
            suspended = false
        }
    }
}

And with this code we are able to backpressure on the flow, thus this is the answer for my own question.

1
broot On

First of all, I think there is no generic meaning of "backpressure", it really depends on a specific case. For example, if we don't use a buffer and collect by simply printing, then technically speaking the producer still waits for the consumer to print the item. But usually we don't consider this to be a real backpressure.

We could assume by backpressure we mean a situation when the collector suspends. I think it should be technically doable to create a general use operator like this. Operator would need to detect that the downstream collector suspended and then it would notify about this. However, it would be probably a little hacky as the coroutines API doesn't provide straightforward ways to detect the callee suspended (but I think this is still possible).

Instead, I suggest another approach. Usually, when we talk about the backpressure, we mean a situation we used a buffer to run the producer and the consumer concurrently. As the buffer operator orchestrates communication between both sides, this is the place where we can easily detect the backpressure:

fun <T> Flow<T>.bufferWithOverflowDetection(capacity: Int = Channel.BUFFERED, onOverflow: (T) -> Unit): Flow<T> = flow {
    coroutineScope {
        val channel = produce(capacity = capacity) {
            collect {
                // collecting from upstream
                if (trySend(it).isFailure) {
                    onOverflow(it)
                    send(it)
                }
            }
        }
        channel.consumeEach {
            // emitting to downstream
            emit(it)
        }
    }
}

Then just replace one line in your example:

val bufferedFlow = createFlow().bufferWithOverflowDetection { println("Overflow with: $it") }

Also, if you have similar needs, then you can consider migrating to channels entirely as they are generally more powerful utilities for coordinating producers and consumers. Although, as we can see above, we can sometimes use both utils together.