https://kotlinlang.org logo
#coroutines
Title
# coroutines
d

Daniele Segato

10/05/2020, 11:00 AM
@travis I believe you are the author of this comment? https://github.com/Kotlin/kotlinx.coroutines/pull/2069#issuecomment-702523785 Running into unexpected behavior when trying out 
MutableSharedFlow
. Apologies, as this is a very simplified example from a larger codebase and hopefully it isn't merely demonstrating a gross misuse of 
SharedFlow
 and 
onSubscription
.
Copy code
private val counter = AtomicInteger()

class Example {

    private val scope = CoroutineScope(Job())

    private val _incoming = MutableSharedFlow<Int>(replay = 0)
    private val incoming: SharedFlow<String> = _incoming
        .onSubscription { emit(-1) }
        .map {
            // Emulate expensive operator that we want to "share".
            delay(500L)
            it.toString()
        }
        .shareIn(scope, replay = 0)

    suspend fun request(): String =
        incoming
            .onSubscription { _incoming.emit(counter.incrementAndGet()) }
            .first()
}

fun main() {
    val example = Example()
    repeat(10) {
        val result = runBlocking { example.request() }
        println(result)
    }
}
Output:
Copy code
-1
2
3
4
5
6
7
8
9
10
1
 was expected after 
-1
 but is missing.
I'm no expert but I try to answer you cause I think it would help understanding how it works since you have
replay = 0
on your MutableSharedFlow its buffer size is 0 and by default the
BufferOverflow
behavior is to
SUSPEND
until every subscriber got the value. you are running each call to request into a runBlocking:
Copy code
fun main() {
    val example = Example() // here shareIn is executed
    repeat(10) {
        val result = runBlocking { example.request() }
        println(result)
    }
}
which means they are sequential your
_incoming
flow emit
-1
when you create
Example()
subscribe and
1
because you emit it at every call to
request()
Copy code
suspend fun request(): String =
        incoming
            .onSubscription { _incoming.emit(counter.incrementAndGet()) }
            .first()
counter start from 0, so the first item you emit to
_incoming
is 1 when
request()
is first called but you use
first()
: it only take the first item and immediately unsubscribe. so there will be no subscriber to receive your
1
that will be lost. after your
example.request()
completes you go into the next loop which, since the buffer is 0 and I'm not sure if the
1
computation is just "canceled" because no subscriber or if there is some kind of "`conflate` " deriving from the BufferOverflow behavior I imagine that the
shareIn
will have some suspension loop waiting for emitted items the first item
-1
is handled by your code that than unsubscribe so the next item
1
have no subscriber and is ignored (as in = doesn't even enter the map operator)? I'm not sure if this is how it works, will be happy to have someone expert confirm / elaborate. (by the way how did you try SharedFlow? did you compile from code?)
(I'm sorry if i bothered you by pinging you Travis)
t

travis

10/05/2020, 5:32 PM
Not a problem at all pinging me; totally appreciate opening up dialog around this. We're all learning this together. simple smile To your point, I agree, my best guess is that it's an issue with a subscriber not being available. Some assumptions I made, that make me think it shouldn't lose a value: •
onSubscription
documentation states: "The
action
is called before any value is emitted from the upstream flow to this subscription but after the subscription is established. It is guaranteed that all emissions to the upstream flow that happen inside or immediately after this
onSubscription
action will be collected by this subscription." • The
incoming
SharedFlow
is started
SharingStarted.Eagerly
(default)
(by the way how did you try SharedFlow? did you compile from code?)
Yup, it was a bit kludgy: I forked the repo, rebased the
shared-flow
branch onto the
1.3.9
tag and modified some of build config to publish to an internal Maven repo. https://github.com/twyatt/kotlinx.coroutines/commits/shared-flow-1.3.9-github-packages
Looked at it some more, discussion gave me the hints to figure out what I was doing wrong. Commented on the PR. Thanks for the help @Daniele Segato!
d

Daniele Segato

10/06/2020, 10:30 AM
I hope someone can explain exactly what is causing the behavior cause I think it would help understand how share work. If my reasoning is right it means your shared work will do nothing unless there is a subscriber. I really would like to ping Roman Elizarov here but I'm not sure if this is ok with him
t

travis

10/06/2020, 4:46 PM
I added a follow-up question on the PR, I'm hoping it will clarify some things (maybe Roman will answer when he has some free time). I imagine the whole JetBrains team is super busy preparing for https://kotlinlang.org/lp/event-14/.
4 Views