Arjan van Wieringen
06/25/2022, 7:40 PM@Test
fun distributedTest2() = runTest {
repeat(1000) {
val (state, updates) = MergeableValue("Bar", Instant.fromEpochMilliseconds(0)).distribute()
// state is mutablestateflow, updates is mutablesharedflow that also updates state
val otherSource = uuid()
updates.emit(DistributedMergeable.Update(otherSource, MergeableValue("Baz", Instant.fromEpochMilliseconds(1))))
updates.emit(DistributedMergeable.Update(otherSource, MergeableValue("Bat", Instant.fromEpochMilliseconds(2))))
/* What do do here to make sure I can assert `state` */
assertEquals("Bat", state.value.value)
}
}
Basically I have a MutableStateFlow state
and a MutableSharedFlow updates
. By emitting into updates
I update, among other stuff, also the state
. However, for the life of me I can not see how to test this. In an application this works as expected.
Basically this is the distribute()
function:
data class DistributedMergeable<T : Mergeable<T>>(
val states: MutableStateFlow<T>,
val updates: MutableSharedFlow<Update<T>>
) {
data class Update<T : Mergeable<T>>(val source: UUID, val value: T)
}
fun <T : Mergeable<T>> T.distribute(
updates: MutableSharedFlow<DistributedMergeable.Update<T>>,
scope: CoroutineScope = CoroutineScope(EmptyCoroutineContext)
): DistributedMergeable<T> {
val source = uuid()
val states = MutableStateFlow(this)
states.onEach { newState ->
println("$newState")
updates.emit(DistributedMergeable.Update(source, newState))
}.launchIn(scope)
updates.onEach { update ->
if (update.source == source) {
println("Same source: $update")
return@onEach
}
if (update.value == states.value) {
println("Same state: $update")
return@onEach
}
val merged = states.value.merge(update.value)
println("Merged: $update -> $merged")
states.value = merged
}.launchIn(scope)
return DistributedMergeable(states, updates)
}
Joffrey
06/25/2022, 7:53 PMrunTest
, don't repeat and rely on chance and likelyhoodNick Allen
06/25/2022, 8:04 PMTestScope.runCurrent
.Joffrey
06/25/2022, 8:05 PMdistribute()
should provide the scope, or the function should be suspend, otherwise you break structured concurrency (unless you create some sort of closeable object that holds a reference to the scope, which is not the case here).runTest
to distribute()
, you'll be able to make use of the test dispatcher's facilities like @Nick Allen is mentioning here. In particular, advanceUntilIdle
would be useful to you hereNick Allen
06/25/2022, 8:13 PMwaitUntil
in the earlier deleted post's thread. That is using the test clock which always just advances immediately and because of a bug in your logic, it exits normally instead of throwing. The code you are trying to test is using a real dispatcher completely unrelated to the TestScope using other threads. So it's just a race condition on whether the predicate check loop or the tested code finishes first.
Like @Joffrey said, pass the test scope in.Arjan van Wieringen
06/26/2022, 6:20 AM@Test
fun distributedTest2() = runTest {
val (state, updates, job) = MergeableValue("Bar", Instant.fromEpochMilliseconds(0)).distribute(this)
val otherSource = uuid()
runCurrent()
launch {
updates.emit(
DistributedMergeable.Update(
otherSource,
MergeableValue("Baz", Instant.fromEpochMilliseconds(1))
)
)
updates.emit(
DistributedMergeable.Update(
otherSource,
MergeableValue("Bat", Instant.fromEpochMilliseconds(2))
)
)
}
runCurrent()
assertEquals("Bat", state.value.value)
job.cancel()
}
The only issue I have so far is the fact that I need to run runCurrent
at the top. Otherwise the updates aren't received, which is I think because the Job launching those collectors hasn't been running yet:
fun <T : Mergeable<T>> T.distribute(
updates: MutableSharedFlow<DistributedMergeable.Update<T>>,
scope: CoroutineScope
): DistributedMergeable<T> {
val source = uuid()
val states = MutableStateFlow(this)
val job = scope.launch {
states.onEach { newState ->
updates.emit(DistributedMergeable.Update(source, newState))
}.launchIn(this)
updates.onEach { update ->
println(update)
if (update.source == source) {
println("Same source: $update")
return@onEach
}
if (update.value == states.value) {
println("Same state: $update")
return@onEach
}
val merged = states.value.merge(update.value)
println("Merged: $update -> $merged")
states.value = merged
}.launchIn(this)
}
return DistributedMergeable(states, updates, job)
}
Is there a way to make sure these collectors are running before returning the DistributedMergeable
?
I tried something like this:
// await start
scope.launch {
while (job.children.any { !it.isActive }) {
delay(1)
}
}.join()
But that feels wrong.