Daniele Segato
10/05/2020, 11:00 AMMutableSharedFlow
. Apologies, as this is a very simplified example from a larger codebase and hopefully it isn't merely demonstrating a gross misuse of SharedFlow
and onSubscription
.
private val counter = AtomicInteger()
class Example {
private val scope = CoroutineScope(Job())
private val _incoming = MutableSharedFlow<Int>(replay = 0)
private val incoming: SharedFlow<String> = _incoming
.onSubscription { emit(-1) }
.map {
// Emulate expensive operator that we want to "share".
delay(500L)
it.toString()
}
.shareIn(scope, replay = 0)
suspend fun request(): String =
incoming
.onSubscription { _incoming.emit(counter.incrementAndGet()) }
.first()
}
fun main() {
val example = Example()
repeat(10) {
val result = runBlocking { example.request() }
println(result)
}
}
Output:
-1
2
3
4
5
6
7
8
9
10
1
was expected after -1
but is missing.Daniele Segato
10/05/2020, 11:01 AMreplay = 0
on your MutableSharedFlow its buffer size is 0 and by default the BufferOverflow
behavior is to SUSPEND
until every subscriber got the value.
you are running each call to request into a runBlocking:
fun main() {
val example = Example() // here shareIn is executed
repeat(10) {
val result = runBlocking { example.request() }
println(result)
}
}
which means they are sequential
your _incoming
flow emit -1
when you create Example()
subscribe and 1
because you emit it at every call to request()
suspend fun request(): String =
incoming
.onSubscription { _incoming.emit(counter.incrementAndGet()) }
.first()
counter start from 0, so the first item you emit to _incoming
is 1 when request()
is first called
but you use first()
: it only take the first item and immediately unsubscribe.
so there will be no subscriber to receive your 1
that will be lost.
after your example.request()
completes you go into the next loop which, since the buffer is 0 and
I'm not sure if the 1
computation is just "canceled" because no subscriber or if there is some kind of "`conflate` " deriving from the BufferOverflow behavior
I imagine that the shareIn
will have some suspension loop waiting for emitted items
the first item -1
is handled by your code that than unsubscribe
so the next item 1
have no subscriber and is ignored (as in = doesn't even enter the map operator)?
I'm not sure if this is how it works, will be happy to have someone expert confirm / elaborate.
(by the way how did you try SharedFlow? did you compile from code?)Daniele Segato
10/05/2020, 11:11 AMtravis
10/05/2020, 5:32 PMonSubscription
documentation states: "The action
is called before any value is emitted from the upstream flow to this subscription but after the subscription is established. It is guaranteed that all emissions to the upstream flow that happen inside or immediately after this onSubscription
action will be collected by this subscription."
• The incoming
SharedFlow
is started SharingStarted.Eagerly
(default)travis
10/05/2020, 5:36 PM(by the way how did you try SharedFlow? did you compile from code?)Yup, it was a bit kludgy: I forked the repo, rebased the
shared-flow
branch onto the 1.3.9
tag and modified some of build config to publish to an internal Maven repo.
https://github.com/twyatt/kotlinx.coroutines/commits/shared-flow-1.3.9-github-packagestravis
10/05/2020, 9:20 PMDaniele Segato
10/06/2020, 10:30 AMtravis
10/06/2020, 4:46 PM