I have a producer consumer kind of scenario. Here is the producer block that will continuously feed elements into a LinkedBlockingQueue
suspend fun produceElements() {
//long running action to fetch the element
val item = fetchItem()
//adds element to the queue
linkedBlockingQ.add(item)
}
I have another suspend function which acts as a consumer.
suspend fun receiveItems() {
while(true) {
val item = linkedBlockingQ.take() //waits until the first element arrives
//run some long running tasks
if(linkedBlockingQ.isEmpty()) break
}
}
Finally on the caller method,
suspend fun doMain() {
val receiveTask = async { receiveItems() }
val produceTask = async { produceElements() }
produceTask.await() //wait until this is complete
receiveTask.await() //wait until receive task is complete
}
My concern is that when there is delay in producing elements, the LinkedBlockingQueue can be empty resulting in the false completion of receive task. Is this the correct way in this scenario?
I though about creating an example with Channels to exemplify why this could be better done with Channels. The code is divided into 2 implementations. The first with the
LinkedBlockingQueueexample similar to yours and the second with aChannel. First the code:And the result is this:
So with the
LinkedBlockingQueuewe are still missing some sort of management to get the last result and also notice that I had to make the code use coroutine contexts explicitly by specifying dispatchers. Even if we know that we need to wait for 2 elements we still need to make multiple checks on the queue, which is not ideal since these are blocking operations that with other words would hinder production times. ALinkedBlockingQueueisn't therefore ideal for coroutines. However, aChannelis! In this case I'm specifying a 2 element buffer, but this can vary. We could have also just created arendez-vouschannel and this would still work. The point is that, with channels we don't need to figure out a way to wait for the last element and functions only get suspended. They don't block each other and so receiving on a channel doesn't hinder sending on a channel. To anyone playing along with this example, feel free to change thePRODUCTION_COST_MSvalue and observe if the results differ.I just felt important to share this extended version of your example with the two implementations to make it more clear to who wants to learn with more with this great question of yours.