dimsuz
07/25/2020, 6:44 PMscan
in my Flow chain, it starts to act strange — items emitted by channels stop arriving into it. I tried to come up with minimal example to reproduce, posted on SO, would be grateful if anyone could take a look and help:
https://stackoverflow.com/q/63092171/258848Zach Klippenstein (he/him) [MOD]
07/25/2020, 7:08 PM.onEach { println(it) }
after extraValues.asFlow()
when you pass it to merge
, are you seeing the values there?dimsuz
07/25/2020, 7:17 PMdimsuz
07/25/2020, 7:17 PMscan
, they start to be printed there tooZach Klippenstein (he/him) [MOD]
07/25/2020, 7:46 PMextraValues
sends when that initial seed is emitted, those values will be sent into the channel before anyone is subscribed to it, which, per the BroadcastChannel documentation, means those values will be ignored (BroadcastChannels ignore values when there are no subscribers). Here’s the source: https://github.com/Kotlin/kotlinx.coroutines/blob/d55d8e80e152e46c170edc4dcfa08ccb7b2c4a03/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt#L87Zach Klippenstein (he/him) [MOD]
07/25/2020, 7:49 PMdimsuz
07/25/2020, 8:00 PMonStart
has this behavior too, I wanted to include this in my question, but decided not to add complexity 🙂
Thanks for explanation. I wonder how can I workaround this limitation. I really need the scan
here. Not sure if it's possible to write some scan
which would emit first value after collection has startedZach Klippenstein (he/him) [MOD]
07/25/2020, 8:03 PMproduceIn
), and then just consuming the channel until it’s closed.julian
07/25/2020, 8:05 PMif (it == 1)
is changed to if (it == 2)
, the code behaves as desired.dimsuz
07/25/2020, 8:08 PMextraValues
channel in my sample.
So whenever the very first reduce operation triggers this side effect, described behavior is observed: some incoming state is being lost...julian
07/25/2020, 8:11 PM... subscribing upstream with a channel before emitting (i.e. withCan you explain a bit more?), and then just consuming the channel until it’s closed.produceIn
Zach Klippenstein (he/him) [MOD]
07/25/2020, 8:22 PMcoroutineScope {
val subscription = this@scan.produceIn(this)
emit(seed)
subscription.consumeEach {
// rest of scan logic
}
}
julian
07/25/2020, 8:26 PMscan
implementation?dimsuz
07/25/2020, 8:32 PMZach Klippenstein (he/him) [MOD]
07/25/2020, 8:49 PMMutableSharedFlow
will fix this. I believe it has the same behavior as ArrayBroadcastChannel
re: dropping items sent to it when there are no subscribers.dimsuz
07/25/2020, 9:08 PMSharedFlow
has onSubscription
operator designed specifically to support this case. Unless I've misread something.Zach Klippenstein (he/him) [MOD]
07/25/2020, 9:41 PMSharedFlow
though.dimsuz
07/25/2020, 10:27 PMFlow
, no luck.