Flows - Cloning a flow without multiple iteration - am I doing it right?

578 views Asked by At

I am just starting to familiarize myself with Kotlin flows.

For this, I am using them to parse the contents of a binary file which I will simulate using the following flow:

fun testFlow() = flow {
    println("Starting loop")

    try {
        for (i in 0..5) {
            emit(i)
            delay(100)
        }

        println("Loop has finished")
    }
    finally {
        println("Finally")
    }
}

Now, I need the file contents multiple times basically to extract different sets of information. However, I don't want to read the file twice, but only once.

As there doesn't seem to be a built-in mechanism to clone / duplicate a flow, I developed the following helper function:

interface MultiConsumeBlock<T> {
    suspend fun subscribe(): Flow<T>
}

suspend fun <T> Flow<T>.multiConsume(capacity: Int = DEFAULT_CONCURRENCY, scope: CoroutineScope? = null, block: suspend MultiConsumeBlock<T>.() -> Unit) {
    val channel = buffer(capacity).broadcastIn(scope ?: CoroutineScope(coroutineContext))

    val context = object : MultiConsumeBlock<T> {
        override suspend fun subscribe(): Flow<T> {
            val subscription = channel.openSubscription()
            return flow { emitAll(subscription) }
        }
    }
    try {
        block(context)
    } finally {
        channel.cancel()
    }
}

which I then use like this (think about the analogy to the file: flow a gets every record, flow b only the first 3 records (="file header") and flow c everything after the header):

fun main() = runBlocking {
    val src = testFlow()

    src.multiConsume {
        val a = subscribe().map { it }
        val b = subscribe().drop(3).map{ it + it}
        val c = subscribe().take(3).map{ it * it}

        mapOf("A" to a, "B" to b, "C" to c).map { task -> launch { task.value.collect{ println("${task.key}: $it")} } }.toList().joinAll()
    }
}

Output:

Starting loop
A: 0
C: 1
A: 1
C: 2
A: 4
C: 3
A: 9
C: 4
A: 16
C: 5
B: 10
C: 6
B: 12
C: 7
B: 14
C: 8
B: 16
C: 9
B: 18
C: 10
B: 20
C: 11
Loop has finished
Finally

Which looks good so far. However, am I am unsure if I am using Kotlin's flows correctly in this regard.
Am I opening myself up for Deadlocks, missed Exceptions etc.?

The documentation just states:

All implementations of the Flow interface must adhere to two key properties described in detail below:

  • Context preservation.
  • Exception transparency.

But I am unsure if that's the case for my implementation or if I am missing something.
Or maybe there is a better way alltogether?

0

There are 0 answers