How to continue emitting even an exception occurred when emitting?

44 views Asked by At
        // library code
        val items = listOf(1, 2, 3)
        val flow =
            flow<Int> {
                for (item in items) {
                    val childFlow =
                        listOf(item, item, item)
                            .asFlow()
                            .catch {
                                println(it.message)
                            }
                            .onCompletion { error ->
                                println("completed child flow, error=${error?.message}")
                            }
                    try {
                        emitAll(
                            childFlow
                        )
                    } catch (ex: Exception) {
                        if (ex.message?.contains("not allowed") == true) {
                            println("transient exception when emitting ${ex.message}")
                            continue
                        }
                        throw ex
                    }
                }
            }
                .catch {
                    println(it.message)
                }
                .onEach {
                }

        // client code
        flow
            .onEach { item ->
                if (item == 2) {
                    throw Exception("not allowed")
                }
                println(item)
            }
            .collect()

when an exception happens in collector emitAll functions thrown exception, I want to try exception and continue to emit with next values but this cause following exception:

    Previous 'emit' call has thrown exception java.lang.Exception: not allowed, but then emission attempt of value '3' has been detected.
    Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
    For a more detailed explanation, please refer to Flow documentation.

I don't have full control on collector because it is client decision so I want to handle all exceptions that coming from collector side.

1

There are 1 answers

1
user2340612 On

If I understood your question correctly, you're asking for a way to continue emitting values even if the consumer/collector fails to process an emitted item.

I'm no coroutine expert, but the emitter and the collector are separate entities – which means if a collector throws an exception when processing an emitted item, that shouldn't affect the emitter nor another collector.

As an example, the following piece of code creates a Flow that is consumed by 2 collectors – the first one will throw an exception, while the second one won't (but it will still fully consume the flow)

fun main() {
    // create a flow
    val flow = flow {
        val itemsToEmit = 1..5
        emitAll(itemsToEmit.asFlow())
    }

    runBlocking {
        // use the supervisor scope so that when the first coroutine fails, it won't cancel the second one
        supervisorScope {
            // create 2 collectors
            repeat(2) { iterationNumber ->
                launch { FlowConsumer(shouldThrowException = iterationNumber % 2 == 1).consume(flow) }
            }
        }
    }
}

class FlowConsumer(private val shouldThrowException: Boolean) {
    suspend fun consume(flow: Flow<Int>) {
        flow
            .onEach {
                if (shouldThrowException && it == 2) throw Exception("Not allowed")
                println("[${Thread.currentThread()}] $it")
            }
            .collect()
    }
}