gpeal
05/11/2020, 2:47 AMlouiscad
05/11/2020, 2:57 AMStateFlow
would expose?gpeal
05/11/2020, 2:58 AM@Test
fun testStateFlow() = runBlocking {
val flow = MutableStateFlow(0)
flow.onEach {
println(it)
}.launchIn(this)
delay(100)
repeat(10) {
flow.value = it
}
}
This prints 0 and 9 and misses 1-8val stateChannel = BroadcastChannel<S>(capacity = Channel.BUFFERED)
var state = initialState
val flow: Flow<S> get() = channelFlow {
send(state)
stateChannel.consumeEach { send(it) }
}.distinctUntilChanged()
This seems to be working š¤louiscad
05/11/2020, 6:32 AMstate
in your snippet, so it keeps the initialState
.Sinan Kozak
05/11/2020, 6:48 AM@Test
fun testStateFlow() = runBlocking {
val flow = MutableStateFlow(0)
val job = flow
.onEach { println(it) }
.launchIn(this)
delay(100)
repeat(10) {
flow.value = it
yield()
}
job.cancel()
}
private val scope = CoroutineScope(Dispatchers.Unconfined)
@Test
fun testStateFlow() = runBlocking {
val flow = MutableStateFlow(0)
val job = flow
.onEach { println(it) }
.launchIn(scope)
delay(100)
repeat(10) {
flow.value = it
}
job.cancel()
}
bezrukov
05/11/2020, 8:33 AMyield
just slightly improves behavior, but doesn't fix it completely because if consumer is slow, values will be loosed anyway (e.g. adding delay(1)
after println)
Unconfined dispatcher will work, but with a few trade-offs:
1. the whole flow chain must be in unconfined dispatcher, because switching to another will lead to loosing values (won't work if you wanna switch to main thread in android, while stateflow is updated from background thread)
2. no suspend calls allowed (if you add delay(1)
after println, it will be the same) in the chain
3. It prevent fast collectors from fast collection due slow collector (e.g. if there are 9 collectors able to collect with 10hz, and one with 1hz, all collectors will collect at 1hz rate)
Seems
stateFlow.buffer(buffersize)
.flowOn(Dispatchers.Unconfined)
... // could be used in any scope, suspends allowed, doesn't slow down other collectors
will work without mentioned trade-offs, but it will create separate buffer for each collector which is less effective than ArrayBroadcastChannelSinan Kozak
05/11/2020, 8:41 AMgpeal
05/11/2020, 1:59 PM