https://kotlinlang.org logo
#coroutines
Title
# coroutines
e

Erik

05/03/2021, 1:50 PM
MutableSharedFlow()
by default has no buffer (
replay = 0
,
extraBufferCapacity = 0
, i.e.
buffer = replay + extraBufferCapacity = 0
). Does that mean that if I have many emitters that call
emit
, that they would suspend (in order) until the subscribers of the shared flow process the emissions one by one? So, in other words: has the buffer moved away from the shared flow, that intrinsically has no buffer space, to the coroutine scope(s) that now contain various suspended child coroutines trying to
emit
?
t

Tijl

05/03/2021, 2:00 PM
can’t vouch for the bottom part of your description, but yes if there are subscribers emit will suspend
🙏 1
e

Erik

05/03/2021, 2:07 PM
My second question then only is correct while there are any subscribers, am I right?
t

Tijl

05/03/2021, 2:09 PM
yeah it’s a bit hard to parse what you are asking to be honest, but yes all emit calls will suspend until their emit i processed. there is no conflation, for example.
it’s worth noting that a SharedFlow with a buffer and
BufferOverflow.SUSPEND
will behave the same way (once the buffer is full)
I think what you might be getting at is “does this mean none of my emits will be dropped” and the answer is “yes” (as long as there are subscribers)
e

Erik

05/03/2021, 2:14 PM
So if I have a few subscribers of a shared flow. One of them is very slow. I emit one value. Then, while the slow subscriber is still processing the value, if I
tryEmit
another value, does it return
false
because not all subscribers were ready for a new value, yet?
t

Tijl

05/03/2021, 2:16 PM
tryEmit
will always fail if you have no buffer to put it in (which you never have since it’s size
0
), unless there are no subscribers at all (this is a documented behaviour in the link I gave)
e

Erik

05/03/2021, 2:18 PM
I saw that, but what happens if a subscriber is slower than the producer that calls
tryEmit
faster than values are collected?
Because there is no buffer, the
tryEmit
must fail, I would guess
t

Tijl

05/03/2021, 2:18 PM
the subscriber will not receive anything ever if you use tryEmit
e

Erik

05/03/2021, 2:19 PM
I see it now, I read it wrong
Thanks again!
t

Tijl

05/03/2021, 2:20 PM
if you want that behaviour, you could as I suggest use a buffer (e.g. size 1) and
BufferOverflow.SUSPEND
then
emits
will always arrive but
tryEmit
only if the buffer is emptied in time
e

Erik

05/03/2021, 2:25 PM
I only wanted to understand why the default
MutableSharedFlow()
factory function has no buffer. It promotes using coroutines on the producing side.
That is not always practical, though, so that's where
tryEmit
and some buffer come in handy.
However, calling
MutableSharedFlow(...)
with arguments can look a bit verbose.
t

Tijl

05/03/2021, 2:39 PM
it was for sure a discussion point during development. But only one thing can be the default. There is essentially another default (
MutableStateFlow
) with a buffer size of 1 (and dropping), which essentially replaces
ConflatedBroadcastChannel
I personally agree with the decision however, default buffer sizes confuse people, things will work during development and fail during production when the buffer is suddenly full, better make them specify it explicitly so they are aware there is a buffer of a certain size.
e

Erik

05/03/2021, 2:53 PM
I agree, but I wanted to verify that the buffer effectively moves elsewhere and still might grow, or not. For example. If there is no buffer and 1 slow subscriber, but fast callers of `emit`: then what happens while the subscriber is processing the first value and
emit
is called? Do
emit
calls suspend until successfully emitting the values, or do they return quickly and lose the value?
From a quick test I conclude that they suspend until the subscriber catches up
Effectively, that means that the buffer has moved from the shared flow (it has no buffer) to the suspended emitters (each buffering one value)
You can see the fast producers suspending while the bufferless shared flow is collected slowly
t

Tijl

05/03/2021, 2:55 PM
yes, all emit calls suspend until they are processed by all subscribers. The only “special” behaviour is that subscribers can cancel while you are still suspended in your emit, but it still seems “normal”
e

Erik

05/03/2021, 2:59 PM
So, in conclusion, it's all by design. But the mindset needs to shift when reasoning about this. Having no buffer, but suspending producers instead, effectively still buffers values while there are slow subscribers. The price of a buffer is still paid, somehow, one way or another.
t

Tijl

05/03/2021, 2:59 PM
I mean you call it a buffer, but it’s a suspended call. Of course that holds your local state while you suspend in something, which I suppose you could call a “buffer” but that loosens the terminology quite a bit. I guess your entire stack and heap are a “buffer” too. All your RAM is a buffer, etc.
e

Erik

05/03/2021, 3:06 PM
True, but it still matters to understand where your emitted values are, or if they are lost
With suspending emitters, values can be in a buffer, or suspended on buffer overflow (with the suspend strategy), or lost (if no subscribers)
f

Francesc

05/03/2021, 3:37 PM
what you are describing is backpressure. When the buffer is full emitters suspend to let the collectors catch up (assuming you use
emit
).
thinking of it as "the buffer is on the coroutine side" is not right, there is no buffering, the emitters are suspended
t

Tijl

05/03/2021, 3:39 PM
@Erik yes, as I said:
I think what you might be getting at is “does this mean none of my emits will be dropped” and the answer is “yes” (as long as there are subscribers)
and that’s no accident of course 🙂
e

Erik

05/03/2021, 3:52 PM
@Francesc thanks. Giving it a different name is a better idea. Of course, backpressure is the word I was looking for here. So with the suspending
emit
calls, there is backpressure without blocking (unless the coroutine context that suspends is blocking, of course).
@Tijl I think I'll have to read https://github.com/Kotlin/kotlinx.coroutines/issues/2034 again to see why this particular design was chosen. It's kind of surprising that the suspending behaviour of
emit
depends on the buffer size, the buffer overflow strategy and the presence of subscribers. Especially the last one is a bit surprising, but I might find a good reason in the design documentation.
t

Tijl

05/03/2021, 4:02 PM
as @Francesc points out, it’s backpressure. Which is a kind of core principle of `Flow`s. so core you don’t even call it that most of the time. so it’s not surprising that’s default. I would go all the way back to basics. the original article about Flow and Kotlin, when SharedFlow was only a glimmer in the eye of Roman Elizarov https://elizarov.medium.com/simple-design-of-kotlin-flow-4725e7398c4c if you understand Flow, you understand why this default change is not so strange
e

Erik

05/03/2021, 4:09 PM
I've read Elizarov's articles very often. They're so great! This one I hadn't read in a while. It does answer my question. He points out that suspending the emitter until the subscriber(s) is (are) ready is natural with Kotlin coroutines. Given that, it makes sense to suspend emitters to a mutable shared flow while subscribers aren't ready yet. And it doesn't make sense to suspend while there is no subscriber: either add the value to the buffer, or drop it. Thanks @Tijl
👍 1
10 Views