I'm trying to use coroutines and the Arrow library function parMapNotNull
to parallelize some code in a scientific computation library I'm working on. Everywhere I read, the function kotlinx.coroutines.runBlocking
gets used and recommended for going from a serial, blocking code context to a coroutine code context.
However, I keep getting a compilation error
e: file:///Users/mik/IdeaProjects/tda4j/src/commonMain/kotlin/org/appliedtopology/tda4j/ParallelSymmetricSimplexIndexVietorisRips.kt:16:9 Unresolved reference: runBlocking
I am able to resolve the issue specifically for a JVM target, but even with imports of kotlinx-coroutines-core-js
, I get unresolved reference for any JS compilation of the code.
My build.gradle.kts
includes
sourceSets {
val commonMain by getting {
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
implementation("io.arrow-kt:arrow-core:1.2.1")
implementation("io.arrow-kt:arrow-fx-coroutines:1.2.1")
implementation("space.kscience:kmath-core:0.3.1")
implementation("space.kscience:kmath-tensors:0.3.1")
}
}
val commonTest by getting {
dependencies {
implementation("io.kotest:kotest-framework-engine:5.7.2")
implementation("io.kotest:kotest-assertions-core:5.7.2")
implementation("io.kotest:kotest-property:5.7.2")
}
}
val jvmMain by getting {
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.7.3")
}
}
val jvmTest by getting {
dependencies {
implementation("io.kotest:kotest-runner-junit5:5.7.2")
}
}
val jsMain by getting {
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core-js:1.7.3")
}
}
Edited to add, 2023-11-12:
Following up on Marko Topolnik's comment, I have a structure in my code that looks somewhat like this:
class Foo {
val cache : Array<Sequence<stuff>> = Array(somesize) { emptySequence<stuff> }
val scope = CoroutineScope(Dispatchers.Default)
fun doStuff(d : Int): Sequence<stuff> {
runBlocking(Dispatchers.Default) {
if(cache[d].none()) {
val stuffJob = async {
somecollection.parMapNotNull { doSomethingExpensive }
.map { convertresult }
.sorted()
.asSequence()
}
cache[d] = stuffJob.await()
}
}
return cache[d]
}
}
As long as I don't care at all about JS, this seems to actually work. But part of my reasons for picking Kotlin in the first place was the multi-platform support. If I write this with scope.launch
instead of runBlocking
, then nothing returns.
Elsewhere in the library, I'm using runBlocking
to allow me to juggle a mutex to write to a caching data structure in stuff that gets called in the doSomethingExpensive
here.
Wherever I turn, it looks like in order to use launch
I need to already be in a coroutine, and the thing to use to transition into or out of a coroutine context is to use runBlocking
- except then the code is no longer multi-platform.
I tried doing something like:
class Foo {
val cache : Array<Sequence<stuff>> = Array(somesize) { emptySequence<stuff> }
val scope = CoroutineScope(Dispatchers.Default)
fun doStuff(d : Int): Sequence<stuff> {
scope.launch(Dispatchers.Default) {
if(cache[d].none()) {
val stuffJob = async {
somecollection.parMapNotNull { doSomethingExpensive }
.map { convertresult }
.sorted()
.asSequence()
}
cache[d] = stuffJob.await()
}
}.join()
return cache[d]
}
}
But I'm not allowed to use join
if I'm not in a coroutine context already, so I can't use it to wait for my parallel jobs to finish before compiling and returning the results.
I also tried doing something like:
class Foo {
val cache : Array<Sequence<stuff>> = Array(somesize) { emptySequence<stuff> }
val scope = CoroutineScope(Dispatchers.Default)
fun doStuff(d : Int): Sequence<stuff> {
scope.launch(Dispatchers.Default) {
if(cache[d].none()) {
val stuffJob = async {
somecollection.parMapNotNull { doSomethingExpensive }
.map { convertresult }
.sorted()
.asSequence()
}
cache[d] = stuffJob.await()
}
}
while(cache[d].none()) { }
return cache[d]
}
}
But running this to test things out on the JVM (my known working platform) it seems to go into an infinite loop, spending a LOT more time on the test code than before.