Having some issues testing a flow that emits infinite messages:
Class:
class MessageManager(
dispatcher: CoroutineDispatcher = Dispatchers.IO,
) {
private val messageFlow = flow {
while (true) {
delay(60_000) // Emit every minute
try {
val messages = service.getMessages().await()
emit(messages)
} catch (error: Exception) {
Log.e("Messages", "Error fetching messages: $error")
}
}
}.flowOn(dispatcher)
.shareIn(scope, SharingStarted.Lazily, 1)
val messages: StateFlow<List<Message>> = messageFlow.stateIn(
CoroutineScope(Dispatchers.Main),
SharingStarted.WhileSubscribed(),
emptyList()
)
}
Test:
@OptIn(ExperimentalCoroutinesApi::class)
@RunWith(RobolectricTestRunner::class)
class MessageManagerTest {
private val testScheduler = TestCoroutineScheduler()
private val application: Application = mockk(relaxed = true)
private val testDispatcher = StandardTestDispatcher(testScheduler)
private val mockMessages = listOf(Message("id1", "Test message 1", listOf("mock")))
@Before
fun setUp() {
Dispatchers.setMain(testDispatcher)
}
@After
fun tearDown() {
Dispatchers.resetMain()
}
@Test
fun `messages emits expected data`() = runTest {
// Initialize BroadcastManager
val messageManager = MessageManager(application, testDispatcher)
// Test the messages flow with Turbine
messageManager.messages.test {
// Assert initial state
assertTrue(awaitItem().isEmpty())
// Advance time and assert next emission
advanceTimeBy(60_000)
assertEquals(mockMessages, awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
}
My expectation is that I should only receive one emitted list of messages before the remaning events gets cancelled, but that is not the case. Currently the messageFlow never stops emitting messages. Am I missing something obvious here?