What would the ideal primitive be if I wanted some...
# coroutines
g
What would the ideal primitive be if I wanted something that had the properties of: 1. Multicast (like BroadcastChannel) 2. Being buffered. All subscribers should never miss a value (like ArrayBroadcastChannel) 3. Being able to provide an initial value to new subscribers (like StateFlow)
l
@gpeal Your subscribers rely on receiving a value preceding the last one? Why not keep any required history in the value that a
StateFlow
would expose?
g
Just the latest one prior to the subscription
It seems that the conflated behavior of MutableStateFlow can cause subscribers to miss events if it is updated very quickly
Here is a sample snippet demonstrating #2
Copy code
@Test
    fun testStateFlow() = runBlocking {
        val flow = MutableStateFlow(0)
        flow.onEach {
            println(it)
        }.launchIn(this)
        delay(100)
        repeat(10) {
            flow.value = it
        }
    }
This prints 0 and 9 and misses 1-8
Copy code
val stateChannel = BroadcastChannel<S>(capacity = Channel.BUFFERED)
    var state = initialState
    val flow: Flow<S> get() = channelFlow {
        send(state)
        stateChannel.consumeEach { send(it) }
    }.distinctUntilChanged()
This seems to be working 🤔
l
@gpeal Beware, you're not updating
state
in your snippet, so it keeps the
initialState
.
s
For first test, you need to add yield after changing state. Otherwise it will update value in same cycle without continue any other continuation in runBlocking scope
Copy code
@Test
  fun testStateFlow() = runBlocking {
    val flow = MutableStateFlow(0)
    val job = flow
        .onEach { println(it) }
        .launchIn(this)
    delay(100)
    repeat(10) {
      flow.value = it
      yield()
    }
    job.cancel()
  }
this print every value
or you can launchIn dirent scope.
Copy code
private val scope = CoroutineScope(Dispatchers.Unconfined)

  @Test
  fun testStateFlow() = runBlocking {
    val flow = MutableStateFlow(0)
    val job = flow
        .onEach { println(it) }
        .launchIn(scope)
    delay(100)
    repeat(10) {
      flow.value = it
    }
    job.cancel()
  }
this also print every number
b
yield
just slightly improves behavior, but doesn't fix it completely because if consumer is slow, values will be loosed anyway (e.g. adding
delay(1)
after println) Unconfined dispatcher will work, but with a few trade-offs: 1. the whole flow chain must be in unconfined dispatcher, because switching to another will lead to loosing values (won't work if you wanna switch to main thread in android, while stateflow is updated from background thread) 2. no suspend calls allowed (if you add
delay(1)
after println, it will be the same) in the chain 3. It prevent fast collectors from fast collection due slow collector (e.g. if there are 9 collectors able to collect with 10hz, and one with 1hz, all collectors will collect at 1hz rate) Seems
Copy code
stateFlow.buffer(buffersize)
    .flowOn(Dispatchers.Unconfined)
    ... // could be used in any scope, suspends allowed, doesn't slow down other collectors
will work without mentioned trade-offs, but it will create separate buffer for each collector which is less effective than ArrayBroadcastChannel
s
I used unconfined, because it is test. It was an example to point different scope ddoesnt need yield. I do agree you points. On the otherhand I just want to point that using scope of runBlocking will make test almost single track execution.
g
@louiscad that was just for brevity but you can assume that it would be updated at the same time as items are added to the channel
@Sinan Kozak In our (me and @bezrukov) case, the consumers are users of a library so we can't control where and how the values will be consumed so it has to be robust against bad actors and all dispatchers