Slackbot
06/25/2022, 9:17 AMArjan van Wieringen
06/25/2022, 10:51 AMdata class DistributedMergeable<T : Mergeable<T>>(
val states: MutableStateFlow<T>,
val updates: MutableSharedFlow<Update<T>>
) {
data class Update<T : Mergeable<T>>(val source: UUID, val value: T)
}
fun <T : Mergeable<T>> T.distribute(
updates: MutableSharedFlow<DistributedMergeable.Update<T>>,
scope: CoroutineScope = CoroutineScope(EmptyCoroutineContext)
): DistributedMergeable<T> {
val source = uuid()
val states = MutableStateFlow(this)
states.onEach { newState ->
println("$newState")
updates.emit(DistributedMergeable.Update(source, newState))
}.launchIn(scope)
updates.onEach { update ->
if (update.source == source) {
println("Same source: $update")
return@onEach
}
if (update.value == states.value) {
println("Same state: $update")
return@onEach
}
val merged = states.value.merge(update.value)
println("Merged: $update -> $merged")
states.value = merged
}.launchIn(scope)
return DistributedMergeable(states, updates)
}
fun <T : Mergeable<T>> T.distribute(
scope: CoroutineScope = CoroutineScope(EmptyCoroutineContext)
): DistributedMergeable<T> = distribute(MutableSharedFlow(), scope)
Basically, Mergeable
provides a merge operation. And I distribute state updates in a sharedflow, and merge if they are different.
I have a basic test:
@Test
fun distributedTest2() = runTest {
val (state, updates) = MergeableValue("Bar", Instant.fromEpochMilliseconds(0)).distribute()
val otherSource = uuid()
updates.emit(DistributedMergeable.Update(otherSource, MergeableValue("Baz", Instant.fromEpochMilliseconds(1))))
updates.emit(DistributedMergeable.Update(otherSource, MergeableValue("Bat", Instant.fromEpochMilliseconds(2))))
delayUntil { state.value.value == "Bat" }
assertEquals("Bat", state.value.value)
}
private suspend fun delayUntil(step: Long= 5, max: Long = 10000, predicate: () -> Boolean) {
var t = 0L
while (!predicate() && t < max) {
delay(step)
t += step
}
if (t > max) throw Error("Timout")
}
private suspend fun runWhen(predicate: () -> Boolean, block: () -> Unit) {
delayUntil(predicate = predicate)
block()
}
This one works in JVM, but not in JS. In JS it claims state.value.value
is still equal to Bar
.
How can that be if the delayUntil
above works?
In the JS case I don't see the logging of the updates at all. Which I do see in the JVM case.Nick Allen
06/25/2022, 8:06 PMt==max
the delayUntil will return successfully even if the predicate is not true.Arjan van Wieringen
06/26/2022, 6:21 AM