I am trying to launch an Coroutine
inside my PagingSource
in order to watch how long my paging source is already trying to get my data. The only problem I have here is, that my Coroutine
is still somehow collecting some data, even after I stopped my shopPagingWatcher Flow
. Because of this, it throws IOException("No Intenet Exception)
even when it should not.
I am launching a Coroutine
because watching the state should not block the main flow of my paging source
PagingSource
class ShopRemoteMediator @Inject constructor(
private val db: FirebaseFirestore,
private val shopPagingWatcher: ShopPagingWatcher,
) : PagingSource<QuerySnapshot, Product>() {
@InternalCoroutinesApi
override suspend fun load(params: LoadParams<QuerySnapshot>): LoadResult<QuerySnapshot, Product> {
return try {
// Launch Async Coroutine, Observe State, throw IO Exception when not loaded within 5 seconds
shopPagingWatcher.start()
CoroutineScope(Dispatchers.IO).launch {
shopPagingWatcher.observeMaxTimeReached().collect { maxTimeReached ->
if (maxTimeReached) {
Timber.d("Mediator failed")
throw IOException("No Internet Exception")
}
}
}
val currentPage = params.key ?: db.collection(FIREBASE_PRODUCTS)
.limit(SHOP_LIST_LIMIT)
.get()
.await()
val lastDocumentSnapShot = currentPage.documents[currentPage.size() - 1]
val nextPage = db.collection(FIREBASE_PRODUCTS)
.limit(SHOP_LIST_LIMIT)
.startAfter(lastDocumentSnapShot)
.get()
.await()
// When PagingSource is here, it successfully loaded currentPage and nextPage, therefore stop Watcher
Timber.d("Mediator Sucessfull")
shopPagingWatcher.stop()
LoadResult.Page(
data = currentPage.toObjects(),
prevKey = null,
nextKey = nextPage
)
} catch (e: Exception) {
// IOException should be caught here, but it is not! The app crashed instead!
Timber.d("Mediator Exception ist $e")
LoadResult.Error(e)
}
}
}
ShopPagingWatcher
@Singleton
class ShopPagingWatcher @Inject constructor() : Workwatcher()
Abstract WorkWatcher
abstract class Workwatcher {
private companion object {
private val dispatcher = Dispatchers.IO
private var timeStamp by Delegates.notNull<Long>()
private var running = false
private var manuallyStopped = false
private var finished = false
private const val maxTime: Long = 5000000000L
}
// Push the current timestamp, set running to true
// I don't know if it is necessary to use "synchronized"
@InternalCoroutinesApi
fun start() = synchronized(dispatcher) {
timeStamp = System.nanoTime()
running = true
manuallyStopped = false
finished = false
}
// Manually stop the WorkerHelper
// I don't know if it is necessary to use "synchronized"
@InternalCoroutinesApi
fun stop() = synchronized(dispatcher) {
running = false
manuallyStopped = true
finished = true
Timber.d("Mediator stopped")
}
// Function that observes the time
fun observeMaxTimeReached(): Flow<Boolean> = flow {
// Check if maxTime is not passed with → (System.nanoTime() - timeStamp) <= maxTime
while (running && !finished && !manuallyStopped && (System.nanoTime() - timeStamp) <= maxTime) {
emit(false)
Timber.d("Currenttime is smaller, everything fine")
}
// This will be executed only when the Worker is running longer than maxTime
if (!manuallyStopped && !finished) {
Timber.d("Currenttime bigger, yikes. Stop worker")
emit(true)
running = false
finished = true
return@flow
} else if (finished || manuallyStopped) {
return@flow
}
}.flowOn(dispatcher)
}
How should I change my Coroutine
inside my PagingSource
in order to achieve my goal? Timber.d("Mediator stopped)
gets called.
I appreciate every help, thank you.
Do you need to measure duration? Time is already passing everywhere, you don't need another thread or coroutine to track that. There's
measureNanoTime {}
that measures how long a code block took to execute.Do you need to apply a timeout inside a suspending function? There's
withTimeout
exactly for that. Example: