Having some issues testing a flow that emits infin...
# coroutines
h
Having some issues testing a flow that emits infinite messages: Class:
Copy code
class MessageManager(
    dispatcher: CoroutineDispatcher = <http://Dispatchers.IO|Dispatchers.IO>,
) {
    private val messageFlow = flow {
        while (true) {
            delay(60_000)  // Emit every minute
            try {
                val messages = service.getMessages().await()
                emit(messages)
            } catch (error: Exception) {
                Log.e("Messages", "Error fetching messages: $error")
            }
        }
    }.flowOn(dispatcher)
        .shareIn(scope, SharingStarted.Lazily, 1)

    val messages: StateFlow<List<Message>> = messageFlow.stateIn(
        CoroutineScope(Dispatchers.Main),
        SharingStarted.WhileSubscribed(),
        emptyList()
    )
}
Test:
Copy code
@OptIn(ExperimentalCoroutinesApi::class)
@RunWith(RobolectricTestRunner::class)
class MessageManagerTest {

    private val testScheduler = TestCoroutineScheduler()
    private val application: Application = mockk(relaxed = true)
    private val testDispatcher = StandardTestDispatcher(testScheduler)
    private val mockMessages = listOf(Message("id1", "Test message 1", listOf("mock")))

    @Before
    fun setUp() {
        Dispatchers.setMain(testDispatcher)
    }

    @After
    fun tearDown() {
        Dispatchers.resetMain()
    }

    @Test
    fun `messages emits expected data`() = runTest {
        // Initialize BroadcastManager
        val messageManager = MessageManager(application, testDispatcher)

        // Test the messages flow with Turbine
        messageManager.messages.test {
            // Assert initial state
            assertTrue(awaitItem().isEmpty())

            // Advance time and assert next emission
            advanceTimeBy(60_000)

            assertEquals(mockMessages, awaitItem())

            cancelAndIgnoreRemainingEvents()
        }
    }
}
My expectation is that I should only receive one emitted list of messages before the remaning events gets cancelled, but that is not the case. Currently the messageFlow never stops emitting messages. Am I missing something obvious here?
h
Yes, you catch the CancellationException.
h
Like this in the messageFlow?
Copy code
private val messageFlow = flow {
    while (true) {
        delay(60_000)  // Emit every minute
        try {
            val messages = service.getMessages().await()
            emit(messages)
        } catch (error: CancellationException) {
            Log.e("Messages", "Error fetching messages: $error")
        } catch (error: Exception) {
            Log.e("Messages", "Error fetching messages: $error")
        }
    }
}.flowOn(dispatcher)
    .shareIn(scope, SharingStarted.Lazily, 1)
h
Nope, you must rethrow the CE.
p
Really you should avoid catching the cancellation and use while(isActive)
h
Ok, now throwing error:
Copy code
private val messageFlow = flow {
        while (true) {
            delay(60_000)  // Emit every minute
            try {
                val messages = service.getMessages().await()
                emit(messages)
            } catch (error: CancellationException) {
                throw error
            } catch (error: Exception) {
                Log.e("Messages", "Error fetching messages: $error")
            }
        }
    }.flowOn(dispatcher)
        .shareIn(scope, SharingStarted.Lazily, 1)
From what I can tell the test is still not able to actually cancel anything, still runs until OOM.
I did some modifications. From what I can tell here it only collects the messages twice, as expected with a 120s advance. BUT the messages are still being emitted in the actual messageFlow until it eventually OOM.
Copy code
@Test
    fun test() = runTest {
        manager = MessageManager(application, testDispatcher)

        val messages = mutableListOf<List<Message>>()

        val job = launch {
            manager.messages.collect {
                println("collecting message")
                messages.add(it)
            }
        }

        advanceTimeBy(120_000)

        job.cancel()
    }
Managed to cancel the messageFlow(hot flow) by explicitly cancelling the scope created in the Manager-class. In the actual usage of the class the messageFlow should never be cancelled, so I guess I will need the method to stop it just for the sake of testing.
d
Copy code
class MessageManager(
    dispatcher: CoroutineDispatcher = <http://Dispatchers.IO|Dispatchers.IO>,
    scope: CoroutineScope = CoroutineScope(Dispatchers.Main)
) {
    val messages: StateFlow<List<Message>> = messageFlow.stateIn(
        scope,
        SharingStarted.WhileSubscribed(),
        emptyList()
    )
}
Then, in the test,
Copy code
val messageManager = MessageManager(testDispatcher, backgroundScope)
Also,
advanceTimeBy
is heavily discouraged,
delay(120.seconds)
is the preferred solution.
h
I see, thanks. What are the main differences by using delay instead?
d
• It's a
suspend
function, so it won't block the thread; • When we introduce new features to the test framework, they may not interoperate with
advanceTimeBy
well, but are guaranteed to work with `delay`; • One less thing to learn for anyone who reads the code; More: https://github.com/Kotlin/kotlinx.coroutines/issues/3919
👍 1