Hey. Not sure if I should post it here or on <#C0B...
# coroutines
a
Hey. Not sure if I should post it here or on #android - trying here first. I'm trying to write an integration test for my class that consumes a
SharedFlow
and puts the received messages in Android's SQLite database via Room. I run the tests on Robolectric since Android runtime is required to access the real database in tests. A simplified version of what I'm trying to test would look something like:
Copy code
class Collector(
    private val inboundMessages: SharedFlow<InboundMessages>, // a SharedFlow with buffer = 0, replay = 0, BufferOverflow.SUSPEND
    private val repository: Repository,
    private val collectingDispatcher: CoroutineDispatcher,
) {
    suspend fun collect(): Unit = withContext(collectingDispatcher) {
        inboundMessages
            .collect {
                try {
                    log.info("Processing message: $it")
                    processMessage(it)
                } catch (e: Exception) {
                    log.error("Error while processing message: $it", e)
                }
            }
    }

    suspend fun processMessage(message: InboundMessage) {
        repository.insertOrUpdate(message.firstPieceOfData) // uses Room under the hood; this method is suspending
        repository.insertOrUpdate(message.secondPieceOfData) // uses Room under the hood; this method is suspending
    }
}
When my test creates
Collector
, it passes
StandardTestDispatcher
as
collectingDispatcher
The test method looks like this:
Copy code
val inboundMessages = MutableSharedFlow<InboundMessage>()

val repository = Repository(...) // has a real SQLite DB accessed via Room under the hood

val testDispatcher = StandardTestDispatcher()

val collector = Collector(inboundMessages, repository, testDispatcher)

@Test
fun `the message should be stored in the repository`() = runTest(testDispatcher) {

    // Also tried non-background scope and cancelling this coroutine manually before
    // the end of the test but the result was exactly the same.
    backgroundScope.launch {
        collector.collect()
    }
    
    testScheduler.runCurrent() // Makes sure the subscription is active

    inboundMessages.emit(
        InboundMessage(
            firstPieceOfData = Message(id = "id-1"),
            secondPieceOfData = Message(id = "id-2"),
        )
    )

    testScheduler.runCurrent() // I hoped this will ensure the collector has enough time to process the message

    // But unfortunately this code resumes and runs the assertion below while Collector is still in the middle of
    // processing the emitted item.
    repository.getAllIds() shouldBe listOf("id-1", "id-2")
}
The logs I added in a few places (e.g. in the
catch
block of the
Collector
) clearly indicate that the
Collector
is still in the middle of processing the item (inside
processMessage
method) when the test resumes, runs the assertion and cancels the launched coroutine with
collector.collect()
.
Copy code
Error while processing message: InboundMessage(...blablabla...)
kotlinx.coroutines.JobCancellationException: Job was cancelled; job=ReportingSupervisorJob{Cancelling}@37627a1d
	at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1555)
	at kotlinx.coroutines.CoroutineScopeKt.cancel(CoroutineScope.kt:287)
	at kotlinx.coroutines.CoroutineScopeKt.cancel$default(CoroutineScope.kt:285)
	at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTest$2$1.invokeSuspend(TestBuilders.kt:365)
It's also non-deterministic, e.g. sometimes the
firstPieceOfData
is saved before the assertion runs, but sometimes it doesn't:
Copy code
Missing elements from index 0
expected:<["id-1", "id-2"]> but was:<[]>
or
Copy code
Missing elements from index 1
expected:<["id-1", "id-2"]> but was:<["id-1"]>
--- Therefore, I wonder: 1. Why doesn't
runCurrent()
suspend until the message passed through
SharedFlow
is fully processed and the data is stored in
Repository
? 2. Does it depend on the Room's internal implementation of their `suspend fun`s used for putting the data into DB? E.g. assuming they invoke the DB transactions on a dedicated thread/dispatcher (which is quite probable). But I thought
runCurrent()
should still work since our
StandardTestDispatcher
still has some work to do. 3. How can/should I test this?
🧵 2
OK, I think I found a workaround by using a combination of: 1. InstantTaskExecutorRule 2. allowMainThreadQueries() in the tests.