I have a requirement, where we want to asynchronously handle some upstream request/payload via coroutine. I see that there are several ways to do this, but wondering which is the right approach -
- Provide explicit spring service class that implements CoroutineScope
- Autowire singleton scope-context backed by certain defined thread-pool dispatcher.
- Define method local
CoroutineScope
object
Following on this question, I'm wondering whats the trade-off if we define method local scopes like below -
fun testSuspensions(count: Int) {
val launchTime = measureTimeMillis {
val parentJob = CoroutineScope(Dispatchers.IO).launch {
repeat(count) {
this.launch {
process() //Some lone running process
}
}
}
}
}
Alternative approach to autowire
explicit scope object backed by custom dispatcher -
@KafkaListener(
topics = ["test_topic"],
concurrency = "1",
containerFactory = "someListenerContainerConfig"
)
private fun testKafkaListener(consumerRecord: ConsumerRecord<String, ByteArray>, ack: Acknowledgment) {
try {
this.coroutineScope.launch {
consumeRecordAsync(consumerRecord)
}
} finally {
ack.acknowledge()
}
}
suspend fun consumeRecordAsync(record: ConsumerRecord<String, ByteArray>) {
println("[${Thread.currentThread().name}] Starting to consume record - ${record.key()}")
val statusCode = initiateIO(record) // Add error-handling depending on kafka topic commit semantics.
// Chain any-other business logic (depending on status-code) as suspending functions.
consumeStatusCode(record.key(), statusCode)
}
suspend fun initiateIO(record: ConsumerRecord<String, ByteArray>): Int {
return withContext(Dispatchers.IO) { // Switch context to IO thread for http.
println("[${Thread.currentThread().name}] Executing network call - ${record.key()}")
delay(1000 * 2) // Simulate IO call
200 // Return status-code
}
}
suspend fun consumeStatusCode(recordKey: String, statusCode: Int) {
delay(1000 * 1) // Simulate work.
println("[${Thread.currentThread().name}] consumed record - $recordKey, status-code - $statusCode")
}
Autowiring bean as follows in some upstream config class -
@Bean(name = ["testScope"])
fun defineExtensionScope(): CoroutineScope {
val threadCount: Int = 4
return CoroutineScope(Executors.newFixedThreadPool(threadCount).asCoroutineDispatcher())
}
It depends on what your goal is. If you just want to avoid the thread-per-request model, you can use Spring's support for
suspend
functions in controllers instead (by using webflux), and that removes the need from even using an external scope at all:If you really want your method to return immediately and schedule coroutines that outlive it, you indeed need that extra scope.
Regarding option 1), making custom classes implement
CoroutineScope
is not encouraged anymore (as far as I understood). It's usually suggested to use composition instead (declare a scope as a property instead of implementing the interface by your own classes). So I would suggest your option 2.I would say option 3) is out of the question, because there is no point in using
CoroutineScope(Dispatchers.IO).launch { ... }
. It's no better than usingGlobalScope.launch(Dispatchers.IO) { ... }
(it has the same pitfalls) - you can read about the pitfalls ofGlobalScope
in its documentation.The main problem being that you run your coroutines outside structured concurrency (your running coroutines are not children of a parent job and may accumulate and hold resources if they are not well behaved and you forget about them). In general it's better to define a scope that is cancelled when you no longer need any of the coroutines that are run by it, so you can clean rogue coroutines.
That said, in some circumstances you do need to run coroutines "forever" (for the whole life of your application). In that case it's ok to use
GlobalScope
, or a custom application-wide scope if you need to customize things like the thread pool or exception handler. But in any case don't create a scope on the spot just to launch a coroutine without keeping a handle to it.In your case, it seems you have no clear moment when you wouldn't care about the long running coroutines anymore, so you may be ok with the fact that your coroutines can live forever and are never cancelled. In that case, I would suggest a custom application-wide scope that you would wire in your components.