https://kotlinlang.org logo
#coroutines
Title
# coroutines
a

Arjan van Wieringen

06/25/2022, 7:40 PM
How do I test the following?
Copy code
@Test
    fun distributedTest2() = runTest {
        repeat(1000) {
            val (state, updates) = MergeableValue("Bar", Instant.fromEpochMilliseconds(0)).distribute()
            // state is mutablestateflow, updates is mutablesharedflow that also updates state
            
            val otherSource = uuid()
            updates.emit(DistributedMergeable.Update(otherSource, MergeableValue("Baz", Instant.fromEpochMilliseconds(1))))
            updates.emit(DistributedMergeable.Update(otherSource, MergeableValue("Bat", Instant.fromEpochMilliseconds(2))))
            
            /* What do do here to make sure I can assert `state` */
            assertEquals("Bat", state.value.value)
        }
    }
Basically I have a MutableStateFlow
state
and a MutableSharedFlow
updates
. By emitting into
updates
I update, among other stuff, also the
state
. However, for the life of me I can not see how to test this. In an application this works as expected. Basically this is the
distribute()
function:
Copy code
data class DistributedMergeable<T : Mergeable<T>>(
    val states: MutableStateFlow<T>,
    val updates: MutableSharedFlow<Update<T>>
) {
    data class Update<T : Mergeable<T>>(val source: UUID, val value: T)
}

fun <T : Mergeable<T>> T.distribute(
    updates: MutableSharedFlow<DistributedMergeable.Update<T>>,
    scope: CoroutineScope = CoroutineScope(EmptyCoroutineContext)
): DistributedMergeable<T> {
    val source = uuid()
    val states = MutableStateFlow(this)

    states.onEach { newState ->
        println("$newState")
        updates.emit(DistributedMergeable.Update(source, newState))
    }.launchIn(scope)

    updates.onEach { update ->
        if (update.source == source) {
            println("Same source: $update")
            return@onEach
        }
        if (update.value == states.value) {
            println("Same state: $update")
            return@onEach
        }
        val merged = states.value.merge(update.value)
        println("Merged: $update -> $merged")
        states.value = merged
    }.launchIn(scope)

    return DistributedMergeable(states, updates)
}
j

Joffrey

06/25/2022, 7:53 PM
Why repeat 1000 times the same test? Your test should instead be made deterministic by injecting the test dispatcher from
runTest
, don't repeat and rely on chance and likelyhood
n

Nick Allen

06/25/2022, 8:04 PM
To answer the commented question: If you want to wait for all the coroutines to finish their work, use
TestScope.runCurrent
.
j

Joffrey

06/25/2022, 8:05 PM
It is often wrong to initialize a scope like this as a default argument. Doing this prevents you from cancelling this scope. The caller of
distribute()
should provide the scope, or the function should be suspend, otherwise you break structured concurrency (unless you create some sort of closeable object that holds a reference to the scope, which is not the case here).
👍 1
1
If you pass the scope that you get from
runTest
to
distribute()
, you'll be able to make use of the test dispatcher's facilities like @Nick Allen is mentioning here. In particular,
advanceUntilIdle
would be useful to you here
n

Nick Allen

06/25/2022, 8:13 PM
You
waitUntil
in the earlier deleted post's thread. That is using the test clock which always just advances immediately and because of a bug in your logic, it exits normally instead of throwing. The code you are trying to test is using a real dispatcher completely unrelated to the TestScope using other threads. So it's just a race condition on whether the predicate check loop or the tested code finishes first. Like @Joffrey said, pass the test scope in.
a

Arjan van Wieringen

06/26/2022, 6:20 AM
Thanks for the replies! I'll pas the test scope in then. @Joffrey I am repeating 1000 times because I was seeing non deterministic behavior and as such this is why I was opening this thread. I still keep stumbling over the basic coroutine issues it seems :(
Okay, I've got it working now. I've exposed the Job as well so I can cancel it in order to stop the indefinite suspending of the hot flows:
Copy code
@Test
    fun distributedTest2() = runTest {
        val (state, updates, job) = MergeableValue("Bar", Instant.fromEpochMilliseconds(0)).distribute(this)
        val otherSource = uuid()
        runCurrent()
        launch {
            updates.emit(
                DistributedMergeable.Update(
                    otherSource,
                    MergeableValue("Baz", Instant.fromEpochMilliseconds(1))
                )
            )
            updates.emit(
                DistributedMergeable.Update(
                    otherSource,
                    MergeableValue("Bat", Instant.fromEpochMilliseconds(2))
                )
            )
        }
        runCurrent()
        assertEquals("Bat", state.value.value)
        job.cancel()
    }
The only issue I have so far is the fact that I need to run
runCurrent
at the top. Otherwise the updates aren't received, which is I think because the Job launching those collectors hasn't been running yet:
Copy code
fun <T : Mergeable<T>> T.distribute(
    updates: MutableSharedFlow<DistributedMergeable.Update<T>>,
    scope: CoroutineScope
): DistributedMergeable<T> {
    val source = uuid()
    val states = MutableStateFlow(this)

    val job = scope.launch {
        states.onEach { newState ->
            updates.emit(DistributedMergeable.Update(source, newState))
        }.launchIn(this)

        updates.onEach { update ->
            println(update)
            if (update.source == source) {
                println("Same source: $update")
                return@onEach
            }
            if (update.value == states.value) {
                println("Same state: $update")
                return@onEach
            }
            val merged = states.value.merge(update.value)
            println("Merged: $update -> $merged")
            states.value = merged
        }.launchIn(this)
    }
    return DistributedMergeable(states, updates, job)
}
Is there a way to make sure these collectors are running before returning the
DistributedMergeable
? I tried something like this:
Copy code
// await start
    scope.launch {
        while (job.children.any { !it.isActive }) {
            delay(1)
        }
    }.join()
But that feels wrong.
7 Views