ClosedReceiveChannelException though it should not be closed yet

158 views Asked by At

I have two flows of strings coming from a database. I want to merge these two flows into one flow of alternating elements.

If the resulting flow is not consumed entirely, the two input flows should be cancelled, so that the data fetching stops and the database resources can be closed.

My current draft of the combinedAlternately function used two channels internally. This works fine, if both flows provide enough elements. But if at least one of the input flows is empty the corresponding channel can not be closed. In this case I get an ClosedReceiveChannelException with the message "Channel was closed". I just don't understand, where the channel was closed previously. This is my example code:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlin.coroutines.EmptyCoroutineContext

@OptIn(ExperimentalCoroutinesApi::class)
private fun <T> CoroutineScope.combineAlternately(flow1: Flow<T>, flow2: Flow<T>): Flow<T> = flow {
    val channel1 = Channel<T>(3)
    val channel2 = Channel<T>(3)

    val job1 = launch {
        flow1.collect { channel1.send(it) }
        println("channel1.isClosedForSend:    " + channel1.isClosedForSend)
        println("channel1.isClosedForReceive: " + channel1.isClosedForReceive)
        channel1.close()
    }

    val job2 = launch {
        flow2.collect { channel2.send(it) }
        channel2.close()
    }

    while (currentCoroutineContext().isActive && (!channel1.isClosedForReceive || !channel2.isClosedForReceive)) {
        if (!channel1.isClosedForReceive) emit(channel1.receive())
        if (!channel2.isClosedForReceive) emit(channel2.receive())
    }

    job1.cancel()
    job2.cancel()
}

fun main() = runBlocking {
    val flow1 = flow {
        var i = 0
        try {
            while (i < 8) emit("A${++i}")
        } finally {
            println("cancel flow1 upstream fetching")
        }
    }
    val flow2 = flow {
        var i = 0
        try {
            while (true) emit("B${++i}")
        } finally {
            println("cancel flow2 upstream fetching")
        }
    }

    val flowScope = CoroutineScope(EmptyCoroutineContext)
    flowScope.combineAlternately(flow1, flow2).take(33).collect {
        println(it)
    }
    flowScope.cancel() // this is necessary to let the flows execute the finally block
}

Interestingly, when I run this in the Kotlin playground, I get this Exception only every other try. Some runs work as expected, but most of them lead to the exception: https://pl.kotl.in/AX7v_-0gm

When I change the loop in flow1 from while (i < 8) to while (true) the code works as expected. The problem only occurs, if the flow1 runs out of elements, so that close has to be called on channel1.

What am I doing wrong here?

0

There are 0 answers