azabost
10/24/2023, 12:26 PMSharedFlow
and puts the received messages in Android's SQLite database via Room. I run the tests on Robolectric since Android runtime is required to access the real database in tests.
A simplified version of what I'm trying to test would look something like:
class Collector(
private val inboundMessages: SharedFlow<InboundMessages>, // a SharedFlow with buffer = 0, replay = 0, BufferOverflow.SUSPEND
private val repository: Repository,
private val collectingDispatcher: CoroutineDispatcher,
) {
suspend fun collect(): Unit = withContext(collectingDispatcher) {
inboundMessages
.collect {
try {
log.info("Processing message: $it")
processMessage(it)
} catch (e: Exception) {
log.error("Error while processing message: $it", e)
}
}
}
suspend fun processMessage(message: InboundMessage) {
repository.insertOrUpdate(message.firstPieceOfData) // uses Room under the hood; this method is suspending
repository.insertOrUpdate(message.secondPieceOfData) // uses Room under the hood; this method is suspending
}
}
When my test creates Collector
, it passes StandardTestDispatcher
as collectingDispatcher
The test method looks like this:
val inboundMessages = MutableSharedFlow<InboundMessage>()
val repository = Repository(...) // has a real SQLite DB accessed via Room under the hood
val testDispatcher = StandardTestDispatcher()
val collector = Collector(inboundMessages, repository, testDispatcher)
@Test
fun `the message should be stored in the repository`() = runTest(testDispatcher) {
// Also tried non-background scope and cancelling this coroutine manually before
// the end of the test but the result was exactly the same.
backgroundScope.launch {
collector.collect()
}
testScheduler.runCurrent() // Makes sure the subscription is active
inboundMessages.emit(
InboundMessage(
firstPieceOfData = Message(id = "id-1"),
secondPieceOfData = Message(id = "id-2"),
)
)
testScheduler.runCurrent() // I hoped this will ensure the collector has enough time to process the message
// But unfortunately this code resumes and runs the assertion below while Collector is still in the middle of
// processing the emitted item.
repository.getAllIds() shouldBe listOf("id-1", "id-2")
}
The logs I added in a few places (e.g. in the catch
block of the Collector
) clearly indicate that the Collector
is still in the middle of processing the item (inside processMessage
method) when the test resumes, runs the assertion and cancels the launched coroutine with collector.collect()
.
Error while processing message: InboundMessage(...blablabla...)
kotlinx.coroutines.JobCancellationException: Job was cancelled; job=ReportingSupervisorJob{Cancelling}@37627a1d
at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1555)
at kotlinx.coroutines.CoroutineScopeKt.cancel(CoroutineScope.kt:287)
at kotlinx.coroutines.CoroutineScopeKt.cancel$default(CoroutineScope.kt:285)
at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTest$2$1.invokeSuspend(TestBuilders.kt:365)
It's also non-deterministic, e.g. sometimes the firstPieceOfData
is saved before the assertion runs, but sometimes it doesn't:
Missing elements from index 0
expected:<["id-1", "id-2"]> but was:<[]>
or
Missing elements from index 1
expected:<["id-1", "id-2"]> but was:<["id-1"]>
---
Therefore, I wonder:
1. Why doesn't runCurrent()
suspend until the message passed through SharedFlow
is fully processed and the data is stored in Repository
?
2. Does it depend on the Room's internal implementation of their `suspend fun`s used for putting the data into DB? E.g. assuming they invoke the DB transactions on a dedicated thread/dispatcher (which is quite probable). But I thought runCurrent()
should still work since our StandardTestDispatcher
still has some work to do.
3. How can/should I test this?azabost
10/24/2023, 2:19 PM