// library code
val items = listOf(1, 2, 3)
val flow =
flow<Int> {
for (item in items) {
val childFlow =
listOf(item, item, item)
.asFlow()
.catch {
println(it.message)
}
.onCompletion { error ->
println("completed child flow, error=${error?.message}")
}
try {
emitAll(
childFlow
)
} catch (ex: Exception) {
if (ex.message?.contains("not allowed") == true) {
println("transient exception when emitting ${ex.message}")
continue
}
throw ex
}
}
}
.catch {
println(it.message)
}
.onEach {
}
// client code
flow
.onEach { item ->
if (item == 2) {
throw Exception("not allowed")
}
println(item)
}
.collect()
when an exception happens in collector emitAll functions thrown exception, I want to try exception and continue to emit with next values but this cause following exception:
Previous 'emit' call has thrown exception java.lang.Exception: not allowed, but then emission attempt of value '3' has been detected.
Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
For a more detailed explanation, please refer to Flow documentation.
I don't have full control on collector because it is client decision so I want to handle all exceptions that coming from collector side.
If I understood your question correctly, you're asking for a way to continue emitting values even if the consumer/collector fails to process an emitted item.
I'm no coroutine expert, but the emitter and the collector are separate entities – which means if a collector throws an exception when processing an emitted item, that shouldn't affect the emitter nor another collector.
As an example, the following piece of code creates a
Flowthat is consumed by 2 collectors – the first one will throw an exception, while the second one won't (but it will still fully consume the flow)