Slackbot
06/25/2022, 9:17 AMArjan van Wieringen
06/25/2022, 10:51 AMdata class DistributedMergeable<T : Mergeable<T>>(
val states: MutableStateFlow<T>,
val updates: MutableSharedFlow<Update<T>>
) {
data class Update<T : Mergeable<T>>(val source: UUID, val value: T)
}
fun <T : Mergeable<T>> T.distribute(
updates: MutableSharedFlow<DistributedMergeable.Update<T>>,
scope: CoroutineScope = CoroutineScope(EmptyCoroutineContext)
): DistributedMergeable<T> {
val source = uuid()
val states = MutableStateFlow(this)
states.onEach { newState ->
println("$newState")
updates.emit(DistributedMergeable.Update(source, newState))
}.launchIn(scope)
updates.onEach { update ->
if (update.source == source) {
println("Same source: $update")
return@onEach
}
if (update.value == states.value) {
println("Same state: $update")
return@onEach
}
val merged = states.value.merge(update.value)
println("Merged: $update -> $merged")
states.value = merged
}.launchIn(scope)
return DistributedMergeable(states, updates)
}
fun <T : Mergeable<T>> T.distribute(
scope: CoroutineScope = CoroutineScope(EmptyCoroutineContext)
): DistributedMergeable<T> = distribute(MutableSharedFlow(), scope)
Basically, Mergeable provides a merge operation. And I distribute state updates in a sharedflow, and merge if they are different.
I have a basic test:
@Test
fun distributedTest2() = runTest {
val (state, updates) = MergeableValue("Bar", Instant.fromEpochMilliseconds(0)).distribute()
val otherSource = uuid()
updates.emit(DistributedMergeable.Update(otherSource, MergeableValue("Baz", Instant.fromEpochMilliseconds(1))))
updates.emit(DistributedMergeable.Update(otherSource, MergeableValue("Bat", Instant.fromEpochMilliseconds(2))))
delayUntil { state.value.value == "Bat" }
assertEquals("Bat", state.value.value)
}
private suspend fun delayUntil(step: Long= 5, max: Long = 10000, predicate: () -> Boolean) {
var t = 0L
while (!predicate() && t < max) {
delay(step)
t += step
}
if (t > max) throw Error("Timout")
}
private suspend fun runWhen(predicate: () -> Boolean, block: () -> Unit) {
delayUntil(predicate = predicate)
block()
}
This one works in JVM, but not in JS. In JS it claims state.value.value is still equal to Bar.
How can that be if the delayUntil above works?
In the JS case I don't see the logging of the updates at all. Which I do see in the JVM case.Nick Allen
06/25/2022, 8:06 PMt==max the delayUntil will return successfully even if the predicate is not true.Arjan van Wieringen
06/26/2022, 6:21 AM