Kotlin Flow - Some emitted events not received when collect

3.7k views Asked by At

I am using MutableStateFlow. My flow type is sealed class with different states (Loading, Success, Error, etc). Initial value of my flow is empty:

private val _updateDepartmentsState = MutableStateFlow<DepartmentFetchState>(DepartmentFetchState.Empty) 

In my repository I'm emitting different states. For example :

   suspend fun updateDepartments() {
      _updateDepartmentsState.emit(DepartmentFetchState.Loading)
      try {
        remoteDataSource.updateDepartments()
        // here some code
        _updateDepartmentsState.emit(DepartmentFetchState.Success(data))
      } catch(e: NetworkException) {
         _updateDepartmentsState.emit(DepartmentFetchState.Error)
       }
    }

Also in my repository I have read only flow:

val updateDepartmentsState = _updateDepartmentsState.asStateFlow() 

In view model I'm collect flow via interactor. My code inside view model:

 updateDepartmentsState.emitAll(
            interactor
                .updateState // state flow (`updateDepartmentsState` ) from repository via interactor
                .map { state->
                    when (state) {
                        DepartmentFetchState.Loading -> {}
                        DepartmentFetchState.Error-> {}
                        ...
  }
                }.also {
                    interactor.updateDepartments() // call updateDepartments() from repository via interator
                }

As I understand from the documentation, after we have completed the collect, we must get the initial value. But it doesn't happen. Moreover, I do not receive state DepartmentFetchState.Loading. I receive only last state - DepartmentFetchState.Success.

But the most interesting thing is that if I re-call the code from the view model (for example, when updating by swipe), then I get the DepartmentFetchState.Loading state, and then the DepartmentFetchState.Success state, as expected.

I don't understand why on the first call, the initial value that I set when initializing the flow and the DepartmentFetchState.Loading state are lost.

Please, help me(

3

There are 3 answers

0
ucMedia On

This is because StateFlow updates to the value are always conflated. If values are posted faster than they are collected, then the collector only gets the most recent result.

I think you have two options:

  1. Use a little delay after update to ensure it is not skipped (not reliable)
  2. Use SharedFlow instead, which doesn't compare values and emits every update.
1
Sovathna Hong On

What you have described is the intended purposes of StateFlow.

Moreover, I do not receive state DepartmentFetchState.Loading. I receive only last state - DepartmentFetchState.Success.

This is because StateFlow is a state-holder observable flow that emits the current and new state updates to its collectors. And by the time you start collecting the flow your updateDepartments() has already been finished with a DepartmentFetchState.Success. Means that from this moment onward when you collect the flow, the current state is DepartmentFetchState.Success.

And then:

But the most interesting thing is that if I re-call the code from the view model (for example, when updating by swipe), then I get the DepartmentFetchState.Loading state, and then the DepartmentFetchState.Success state, as expected.

When you re-call the code you receive the result as expected, that's because you have been collecting from the flow as your updateDepartments() execute it will emit the current state DepartmentFetchState.Loading, and then DepartmentFetchState.Success respectively.

0
RealityExpander On

You can add a yield() after the emit, which will allow the collector to run it's collect method, as so:

 suspend fun updateDepartments() {
      _updateDepartmentsState.emit(DepartmentFetchState.Loading)
      try {
        remoteDataSource.updateDepartments()
        // here some code
        _updateDepartmentsState.emit(DepartmentFetchState.Success(data))

        yield() // <-- allows the collector to run after each update
      } catch(e: NetworkException) {
         _updateDepartmentsState.emit(DepartmentFetchState.Error)
       }
    }

I'm not sure why you would need to do this in your case, but it is useful when you want each emission to be processed in some way, so it does have a good use-case.

Just know that any processing will be forced to be re-processed, possibly unnecessarily using this approach.