This message was deleted.
# coroutines
s
This message was deleted.
a
Okay, I've worked it out a bit more and I have no clue what is going wrong. I've simplified my code to the following:
Copy code
data class DistributedMergeable<T : Mergeable<T>>(
    val states: MutableStateFlow<T>,
    val updates: MutableSharedFlow<Update<T>>
) {
    data class Update<T : Mergeable<T>>(val source: UUID, val value: T)
}

fun <T : Mergeable<T>> T.distribute(
    updates: MutableSharedFlow<DistributedMergeable.Update<T>>,
    scope: CoroutineScope = CoroutineScope(EmptyCoroutineContext)
): DistributedMergeable<T> {
    val source = uuid()
    val states = MutableStateFlow(this)

    states.onEach { newState ->
        println("$newState")
        updates.emit(DistributedMergeable.Update(source, newState))
    }.launchIn(scope)

    updates.onEach { update ->
        if (update.source == source) {
            println("Same source: $update")
            return@onEach
        }
        if (update.value == states.value) {
            println("Same state: $update")
            return@onEach
        }
        val merged = states.value.merge(update.value)
        println("Merged: $update -> $merged")
        states.value = merged
    }.launchIn(scope)

    return DistributedMergeable(states, updates)
}

fun <T : Mergeable<T>> T.distribute(
    scope: CoroutineScope = CoroutineScope(EmptyCoroutineContext)
): DistributedMergeable<T> = distribute(MutableSharedFlow(), scope)
Basically,
Mergeable
provides a merge operation. And I distribute state updates in a sharedflow, and merge if they are different. I have a basic test:
Copy code
@Test
    fun distributedTest2() = runTest {
        val (state, updates) = MergeableValue("Bar", Instant.fromEpochMilliseconds(0)).distribute()

        val otherSource = uuid()
        updates.emit(DistributedMergeable.Update(otherSource, MergeableValue("Baz", Instant.fromEpochMilliseconds(1))))
        updates.emit(DistributedMergeable.Update(otherSource, MergeableValue("Bat", Instant.fromEpochMilliseconds(2))))

        delayUntil { state.value.value == "Bat" }
        assertEquals("Bat", state.value.value)
    }

private suspend fun delayUntil(step: Long= 5, max: Long = 10000, predicate: () -> Boolean) {
    var t = 0L
    while (!predicate() && t < max) {
        delay(step)
        t += step
    }
    if (t > max) throw Error("Timout")
}

private suspend fun runWhen(predicate: () -> Boolean, block: () -> Unit) {
    delayUntil(predicate = predicate)
    block()
}
This one works in JVM, but not in JS. In JS it claims
state.value.value
is still equal to
Bar
. How can that be if the
delayUntil
above works? In the JS case I don't see the logging of the updates at all. Which I do see in the JVM case.
n
I do notice that if
t==max
the delayUntil will return successfully even if the predicate is not true.
👍 1
a
Yeah I noticed that too, and that;s why I deleted the thread 😉 But thanks!