How does concurrent modification work with coroutines?

208 views Asked by At

I am going through this coroutines hands-on tutorial Coroutines-Channels. So there is this task of concurrently fetching contributors and showing intermediate progress at the same time using channels, See Here

Below is snippet of the proposed solution

suspend fun loadContributorsChannels(
service: GitHubService,
req: RequestData,
updateResults: suspend (List<User>, completed: Boolean) -> Unit) = coroutineScope {

........
.........
val channel = Channel<List<User>>()
for (repo in repos) {
    launch {
        val users = service.getRepoContributors(req.org, repo.name) // suspend function
            .also { logUsers(repo, it) }
            .bodyList()
        channel.send(users) // suspend function
    }
}
var allUsers = emptyList<User>()
repeat(repos.size) {
    val users = channel.receive()  // suspend function
    allUsers = (allUsers + users).aggregate()
    updateResults(allUsers, it == repos.lastIndex) // suspend function
}
}

The function loadContributorsChannels() is called inside a coroutine which is using a Default dispatcher.See here. I have 2 questions.

  1. In the snippet above is allUsers being modified concurrently since we are already inside a coroutine which is using a Default dispatcher?

  2. If I change the code sequence like below why do I get incorrect results? How is the code above different from the snippet below?

    val contributorsChannel = Channel<List<User>>()
    var contributors = emptyList<User>()
    
    for(repo in repos) { 
        launch {
            val contributorsPerRepo = service
                .getRepoContributors(req.org, repo.name) // suspend function
                .also { logUsers(repo, it) }
                .bodyList()
    
            contributors = (contributors + contributorsPerRepo).aggregate()
            contributorsChannel.send(contributors)  // suspend function
        }
    }
    
    repeat(repos.size) {
        updateResults(contributorsChannel.receive(), it == repos.lastIndex) // suspend functions
    }
    

Is this because of concurrent modification or am I missing something?

1

There are 1 answers

2
Marko Topolnik On BEST ANSWER

In the original code, the top-level coroutine is the only one using allUsers. It is its local state.

In your code, contributors is a variable shared by all the coroutines and concurrently updated.

The original code correctly applies the Channel as a synchronization mechanism to fan-in all the concurrent computation into a single coroutine that collects the results and uses them.