I have two flows of strings coming from a database. I want to merge these two flows into one flow of alternating elements.
If the resulting flow is not consumed entirely, the two input flows should be cancelled, so that the data fetching stops and the database resources can be closed.
My current draft of the combinedAlternately
function used two channels internally. This works fine, if both flows provide enough elements. But if at least one of the input flows is empty the corresponding channel can not be closed. In this case I get an ClosedReceiveChannelException
with the message "Channel was closed". I just don't understand, where the channel was closed previously. This is my example code:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlin.coroutines.EmptyCoroutineContext
@OptIn(ExperimentalCoroutinesApi::class)
private fun <T> CoroutineScope.combineAlternately(flow1: Flow<T>, flow2: Flow<T>): Flow<T> = flow {
val channel1 = Channel<T>(3)
val channel2 = Channel<T>(3)
val job1 = launch {
flow1.collect { channel1.send(it) }
println("channel1.isClosedForSend: " + channel1.isClosedForSend)
println("channel1.isClosedForReceive: " + channel1.isClosedForReceive)
channel1.close()
}
val job2 = launch {
flow2.collect { channel2.send(it) }
channel2.close()
}
while (currentCoroutineContext().isActive && (!channel1.isClosedForReceive || !channel2.isClosedForReceive)) {
if (!channel1.isClosedForReceive) emit(channel1.receive())
if (!channel2.isClosedForReceive) emit(channel2.receive())
}
job1.cancel()
job2.cancel()
}
fun main() = runBlocking {
val flow1 = flow {
var i = 0
try {
while (i < 8) emit("A${++i}")
} finally {
println("cancel flow1 upstream fetching")
}
}
val flow2 = flow {
var i = 0
try {
while (true) emit("B${++i}")
} finally {
println("cancel flow2 upstream fetching")
}
}
val flowScope = CoroutineScope(EmptyCoroutineContext)
flowScope.combineAlternately(flow1, flow2).take(33).collect {
println(it)
}
flowScope.cancel() // this is necessary to let the flows execute the finally block
}
Interestingly, when I run this in the Kotlin playground, I get this Exception only every other try. Some runs work as expected, but most of them lead to the exception: https://pl.kotl.in/AX7v_-0gm
When I change the loop in flow1
from while (i < 8)
to while (true)
the code works as expected. The problem only occurs, if the flow1
runs out of elements, so that close
has to be called on channel1
.
What am I doing wrong here?