Kotlin Synchronize Thread with Coroutine

1.9k views Asked by At

Currently I have a main thread that constantly loops:

var suspension = Suspension()

fun loop() {
    // Doing stuff here...
        
    suspension.callTick()
        
    // Doing more stuff here...
}

It calls the callTick() method which sends data to a channel:

class Suspension {
    private val ticksChannel = Channel<Unit>(0)

    fun callTick() {
        ticksChannel.trySend(Unit)
    }

    suspend fun waitTick() {
        ticksChannel.receive()
    }
}

Now my last class makes use of this:

class Task(private val suspension: Suspension) {
    suspend fun runTask() {
        while (true) {
            suspension.waitTick()

            someMethodOnTheMainThread()
        }
    }
}

Now I'm wondering how I can call the someMethodOnTheMainThread() method from the main thread. The function has to be called right after the 'suspension.callTick()' method from loop(). At the moment I'm running the function from the coroutine thread. This causes a lot of errors and null pointer exceptions because it's not synchronized with the main thread.

Basically I'm wondering how to block / lock the main thread until the suspension.waitTick() method is called and the code after it is ran. Is this too complex? Is there another way to make suspending functions work with synchronized code?

3

There are 3 answers

0
Nukeroni On

Ok, so I have tried to implement my own dispatcher for the Client thread as follows:

/**
 * Dispatches execution onto Client event dispatching thread and provides native [delay] support.
 */
@Suppress("unused")
val Dispatchers.Client : ClientDispatcher
    get() = Client

/**
 * Dispatcher for Client event dispatching thread.
 *
 * This class provides type-safety and a point for future extensions.
 */
@OptIn(InternalCoroutinesApi::class)
sealed class ClientDispatcher : MainCoroutineDispatcher(), Delay {
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit = ClientUtility.invokeLater(block)

    @OptIn(ExperimentalCoroutinesApi::class)
    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val timer = schedule(timeMillis, TimeUnit.MILLISECONDS, ActionListener {
            with(continuation) { resumeUndispatched(Unit) }
        })
        continuation.invokeOnCancellation { timer.stop() }
    }

    /** @suppress */
    override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
        val timer = schedule(timeMillis, TimeUnit.MILLISECONDS, ActionListener {
            block.run()
        })
        return object : DisposableHandle {
            override fun dispose() {
                timer.stop()
            }
        }
    }

    private fun schedule(time: Long, unit: TimeUnit, action: ActionListener): Timer =
        Timer(unit.toMillis(time).coerceAtMost(Int.MAX_VALUE.toLong()).toInt(), action).apply {
            isRepeats = false
            start()
        }
}

@OptIn(InternalCoroutinesApi::class)
internal class ClientDispatcherFactory : MainDispatcherFactory {
    override val loadPriority: Int
        get() = 0

    override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher = Client
}

private object ImmediateClientDispatcher : ClientDispatcher() {
    override val immediate: MainCoroutineDispatcher
        get() = this

    override fun isDispatchNeeded(context: CoroutineContext): Boolean = !ClientUtility.isEventDispatchThread()

    @OptIn(InternalCoroutinesApi::class)
    override fun toString() = toStringInternalImpl() ?: "Client.immediate"
}

/**
 * Dispatches execution onto Client event dispatching thread and provides native [delay] support.
 */
internal object Client : ClientDispatcher() {
    override val immediate: MainCoroutineDispatcher
        get() = ImmediateClientDispatcher

    @OptIn(InternalCoroutinesApi::class)
    override fun toString() = toStringInternalImpl() ?: "Client"
}

It is practically the Swing dispatcher. Mainly I have changed the invokeLater method to my own implementation. Another noteworthy modification is that I use my own isEventDispatchThread implementation.

I am still using the Swing Timer class. I'm not sure if that's ok to do... But it seems to work from some testing.

For the invokeLater and isEventDispatchThread implementation:

    val eventQueue = ConcurrentLinkedQueue<Runnable>()

    fun invokeLater(block: Runnable) {
        eventQueue.add(block)
    }

    fun isEventDispatchThread() = if (dispatchThread == null) {
            false
        } else {
            Thread.currentThread() == dispatchThread
        }

    var dispatchThread: Thread? = null

Then in the loop method:

    fun loop() {
        // Doing stuff here

        suspension.callTick()

        ClientUtility.dispatchThread = Thread.currentThread()
        val iterator = ClientUtility.eventQueue.iterator()
        while (iterator.hasNext()) {
            val block = iterator.next()
            block.run()
            iterator.remove()
        }

        // Doing some more stuff here
    }

I now start the coroutine with: Dispatchers.Main. Which dispatcher is the main I've defined in the resources/META-INF/services.kotlinx.coroutines.internal.MainDispatcherFactory file.

0
Sergio On

To call the method someMethodOnTheMainThread() in the main thread and suspend current coroutine you can define it like the following:

suspend fun someMethodOnTheMainThread() = withContext(Dispatchers.Main) {
    // ... your code here
}

withContext - changes the context of a coroutine and runs code in the Main Thread.

Dispatchers.Main is available on Android if we use the kotlinx-coroutines-android artifact. Similarly, on JavaFX if we use kotlinx-coroutines-javafx, and on Swing if we use kotlinx-coroutines-swing. There are probably some other libraries that set it. If you don't use any of them, this dispatcher is not configured, and it cannot be used, so you need to create your own dispatcher.

1
Matt Timmermans On

If you're not using a framework that takes control of the main thread (and would provide a main thread dispatcher), you can do something like this:

var mainScope: CoroutineScope? = null;

fun main() {
    runBlocking {
        mainScope = this
        while(true) {
            //...
            suspension.callTick()
            // allow other coroutines to run
            yield()
            //...
        }
    }
}

And then if you want to run something on the main thread from elsewhere, you can just use mainScope?.launch.