https://kotlinlang.org logo
#coroutines
Title
# coroutines
d

dimsuz

07/25/2020, 6:44 PM
Hi! When I introduce a
scan
in my Flow chain, it starts to act strange — items emitted by channels stop arriving into it. I tried to come up with minimal example to reproduce, posted on SO, would be grateful if anyone could take a look and help: https://stackoverflow.com/q/63092171/258848
z

Zach Klippenstein (he/him) [MOD]

07/25/2020, 7:08 PM
If you put an
.onEach { println(it) }
after
extraValues.asFlow()
when you pass it to
merge
, are you seeing the values there?
d

dimsuz

07/25/2020, 7:17 PM
no, nothing there
If I remove
scan
, they start to be printed there too
z

Zach Klippenstein (he/him) [MOD]

07/25/2020, 7:46 PM
I think I know what’s going on. Looking at the source, the seed accumulator value is emitted before starting to collect the upstream flow. Since you trigger the
extraValues
sends when that initial seed is emitted, those values will be sent into the channel before anyone is subscribed to it, which, per the BroadcastChannel documentation, means those values will be ignored (BroadcastChannels ignore values when there are no subscribers). Here’s the source: https://github.com/Kotlin/kotlinx.coroutines/blob/d55d8e80e152e46c170edc4dcfa08ccb7b2c4a03/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt#L87
This kind of issue has come up with at least one other operator (I think it was onStart?), where the upstream collection was happening later than expected (after the operator’s lambda was invoked), and weird things happen. Or at least, they seem weird coming from maybe RxJava experience where the subscription signal is, I think, fully propagated upstream before anything happens downstream.
👍🏾 1
d

dimsuz

07/25/2020, 8:00 PM
Exactly!
onStart
has this behavior too, I wanted to include this in my question, but decided not to add complexity 🙂 Thanks for explanation. I wonder how can I workaround this limitation. I really need the
scan
here. Not sure if it's possible to write some
scan
which would emit first value after collection has started
z

Zach Klippenstein (he/him) [MOD]

07/25/2020, 8:03 PM
I would start by filing a bug for this and describing your use case in depth. I am curious if @elizarov has an idea how to fix this, and if he has any thoughts about this as a more general, recurring issue. That said, it should be pretty straightforward to write your own version of the operator though, and you can fix the bug by subscribing upstream with a channel before emitting (i.e. with
produceIn
), and then just consuming the channel until it’s closed.
j

julian

07/25/2020, 8:05 PM
This supports @Zach Klippenstein (he/him) [MOD]’s explanation: if
if (it == 1)
is changed to
if (it == 2)
, the code behaves as desired.
1
d

dimsuz

07/25/2020, 8:08 PM
Yeah, it's not the first time I encounter this. Basically I'm often using redux-like "loop", which scans through user produced actions and it can also produce a side-effect along with the state. this side effect can trigger some external source which is represented by the
extraValues
channel in my sample. So whenever the very first reduce operation triggers this side effect, described behavior is observed: some incoming state is being lost...
j

julian

07/25/2020, 8:11 PM
@Zach Klippenstein (he/him) [MOD] I'm having trouble understanding this
... subscribing upstream with a channel before emitting (i.e. with 
produceIn
), and then just consuming the channel until it’s closed.
Can you explain a bit more?
z

Zach Klippenstein (he/him) [MOD]

07/25/2020, 8:22 PM
Copy code
coroutineScope {
  val subscription = this@scan.produceIn(this)
  emit(seed)
  subscription.consumeEach {
    // rest of scan logic
  }
}
j

julian

07/25/2020, 8:26 PM
@Zach Klippenstein (he/him) [MOD] Ah, I think I see. This is a modification of the
scan
implementation?
👍 1
d

dimsuz

07/25/2020, 8:32 PM
by the way it seems like this has been discussed before, I have found this issue: https://github.com/Kotlin/kotlinx.coroutines/issues/1758
👍🏾 1
z

Zach Klippenstein (he/him) [MOD]

07/25/2020, 8:49 PM
For the record, I don’t think
MutableSharedFlow
will fix this. I believe it has the same behavior as
ArrayBroadcastChannel
re: dropping items sent to it when there are no subscribers.
d

dimsuz

07/25/2020, 9:08 PM
Hmm. The last posts in the issue above seem to indicate that
SharedFlow
has
onSubscription
operator designed specifically to support this case. Unless I've misread something.
👍 1
z

Zach Klippenstein (he/him) [MOD]

07/25/2020, 9:41 PM
That only works if you've got direct access to the
SharedFlow
though.
d

dimsuz

07/25/2020, 10:27 PM
ah, yes. If this would be some API giving me only a
Flow
, no luck.
2 Views