How can I copy results from Realm in a background thread while still listening for changes from the main thread?

123 views Asked by At
java.lang.IllegalStateException: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.

on realm.copyFromRealm(results) see the code snipper below because copyFromRealm is expensive and I want to make in the background thread.

Note: I can't obtain a Realm instance on a background thread because the addChangeListener function needs to run on the same thread. Furthermore, the addChangeListener function can't be called on a thread that doesn't have a looper, so I must use the main thread to add the listener. Therefore, I need to find a way to obtain the Realm instance on the main thread, add the listener there, and then perform copyFromRealm on a background thread.

Question: How can I copy results from Realm in a background thread while still listening for changes from the main thread?

Code:

override fun getUsersFlow(projectId: Int?): Flow<List<UserEntity>> {
return callbackFlow {

    val realm = Realm.getDefaultInstance()

    val projectBasedQuery = realm.where(UserEntity::class.java).apply {
        if (projectId != null) equalTo("projectId", projectId)
    }

    var copyJob: Job? = null
    val findAllAsync = projectBasedQuery.findAllAsync()

    findAllAsync.addChangeListener { results ->

        copyJob = CoroutineScope(Dispatchers.Default).launch {
            val items = results.realm.copyFromRealm(results)//CRASH
            val sortedItems = items.sortedByDescending { it.createdTime } // Sort updated list by date
            trySend(sortedItems)
        }
    }

    awaitClose {
        findAllAsync.removeAllChangeListeners()
        copyJob?.cancel()
        if (!realm.isClosed) {
            realm.close()
        }
    }

}.flowOn(Dispatchers.Main)
}
1

There are 1 answers

1
Andrew On

Solution:

freeze() function is the key. See in the code how it used on freezedResults.realm.copyFromRealm(freezedResults)

override fun getUsersFlow(projectId: Int?): Flow<List<UserEntity>> {
return callbackFlow {

    val realm = Realm.getDefaultInstance()

    val projectBasedQuery = realm.where(UserEntity::class.java).apply {
        if (projectId != null) equalTo("projectId", projectId)
    }

    var copyJob: Job? = null
    val findAllAsync = projectBasedQuery.findAllAsync()

    findAllAsync.addChangeListener { results ->

        val freezedResults = results.freeze() //creates new realm object available for reading in the background thread

        copyJob = CoroutineScope(Dispatchers.Default).launch {
            val items = freezedResults.realm.copyFromRealm(freezedResults) //use freezed instance to get data in the background thread
            val sortedItems = items.sortedByDescending { it.createdTime } // Sort updated list by date
            trySend(sortedItems)
        }
    }

    awaitClose {
        findAllAsync.removeAllChangeListeners()
        copyJob?.cancel()
        if (!realm.isClosed) {
            realm.close()
        }
    }

}.flowOn(Dispatchers.Main)
}

Preferable approach:

override fun getUsersFlow(projectId: Int?): Flow<List<UseEntity>> {
    // The use block ensures that the Realm instance is closed properly after its use
    return Realm.getDefaultInstance().use { realm ->
        realm.where(UseEntity::class.java).apply {
            if (projectId != null) equalTo("projectId", projectId)
        }.findAllAsync()
          .toFlow() // Converts the Realm results to Kotlin Flow
          .flowOn(Dispatchers.Main)
          .map { results ->
              // Copy results from Realm from the background thread
              realm.copyFromRealm(results)
                  .sortedByDescending { it.createdTime }
          }
          .flowOn(Dispatchers.Default)
    }
}
  • toFlow() utilizes the same freeze() function internally.
  • Ensure to close the Realm when finished. This can be done using the use block.
  • Don't worry about the flow stopping when you close the Realm. The toFlow() function takes care of opening its own Realm instance and closing it when necessary. So, you can use your Realm to create the flow, then close your Realm, and the flow will still keep going.

Refactored:

fun <T : RealmModel, R> getRealmFlow(
   realmClass: Class<T>,
   queryBlock: (RealmQuery<T>) -> RealmQuery<T> = { it },
   mapAsync: (RealmResults<T>) -> R
): Flow<R> {
   return Realm.getDefaultInstance().use { defaultInstance ->
       defaultInstance.where(realmClass)
           .let(queryBlock)
           .findAllAsync()
           .toFlow().flowOn(Dispatchers.Main)
           .map { mapAsync(it) }
           .flowOn(Dispatchers.Default)
   }
}

override fun getUsersFlow(projectId: Int?): Flow<List<UserEntity>> {
   return getRealmFlow(
       realmClass = UserEntity::class.java,
       queryBlock = { query -> projectId?.let { query.equalTo("projectId", it) } ?: query },
       mapAsync = { it.copyFromRealm(it.realm).sortedByDescending { entity -> entity.createdTime } }
   )
}

Related Questions in ILLEGALSTATEEXCEPTION