I'd like some advice on backpressure - Cold flows...
# coroutines
u
I'd like some advice on backpressure • Cold flows handle backpressure by default by suspending the emitter • Downstream can modify this behavior with
buffer
or
conflate
• Hot streams default behavior varies • MutableSharedFlow is suspend by default but can modify this behavior in the ctor • MutableStateFlow is conflated by default and you CANNOT modify this behavior • In both hot streams, downstream CANNOT modify this behavior (i.e. buffer or conflate dont do anything) Everything correct? (Most importantly whether downstream is able buffer on backpressure from hot upstream)
Ok now I'm super confused. This is what I'm trying to study with
Copy code
val sharedFlow = MutableSharedFlow<Int>()

scope.launch {
    intervalFlow(100)
        .collect {
            sharedFlow.emit(it)
        }
}
scope.launch {
    sharedFlow
        .buffer(capacity = Channel.UNLIMITED)
        .collect {
            Log.d("Default", "pre collect=$it")
            delay(1000)
            Log.d("Default", "collect=$it")
        }
}
and it works as expected, i.e. downstream operator controls the backpressure; the unlimited buffer works, collector doesn't miss anything.
buffer(capacity = Channel.CONFLATE
works as well as expected Whats the
MutableSharedFlow(onBufferOverflow)
then for?
This also confuses me in that it works as expected
Copy code
val stateFlow = MutableStateFlow<Int>(0)

scope.launch {
    intervalFlow(100)
        .collect {
            stateFlow.value = it
        }
}
scope.launch {
    stateFlow
        .buffer(capacity = UNLIMITED)
        .collect {
            Log.d("Default", "pre collect=$it")
            delay(1000)
            Log.d("Default", "collect=$it")
        }
}
I notice that MutableStateFlow is conflated on backpressure by default,but if I set a buffer of unlimited, then downstream sees everything Which I don't understand why it works, as the other day I had the issue of exactly which, UI being busy, missing stateflow emit somehow, so I added
buffer
operator and it did nothing; emit got lost
If I change the source from
intervalFlow
to
(0..200).asFlow()
then I only see the last value inthe second collector, and
buffer
does nothing.. why?
n
When you call
buffer(capacity = Buffer.UNLIMITED)
you are creating a
Channel
-based
Flow
that listens to the
SharedFlow
. When the
SharedFlow
tries to emit to subscribers, this this subscriber accepts the item immediately and adds it to its infinite buffer. The `SharedFlow`'s buffer never comes into play at all.
From the
SharedFlow
docs:
The buffer space determines how much slow subscribers can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved using the
extraBufferCapacity
parameter.
• In both hot streams, downstream CANNOT modify this behavior (i.e. buffer or conflate dont do anything)
This isn't quite correct. Such operators cannot affect the `SharedFlow`'s internal buffer capacity, but they do create buffers that affect the subscribing `Flow`'s behavior.
When you only get the last value, you are emitting all 200 values to the
StateFlow
before you even subscribe.
u
hmm okay, firstly, if I dont use hot source, just emit on io dispatcher frequently, then the collector downstream buffers these by default up to 64, .. so there is a implicit backpressure
buffer
to each operator, rght?
and youre saying, the mutablesharedflow's ctor param talks about some other buffer?
n
Idk what you mean with the io dispatcher, maybe a code snippet?
u
Just this
Copy code
scope.launch {
     intervalFlow(100ms)
         .flowOn(<http://Dispatchers.Io|Dispatchers.Io>)
         .collect {
             Log.d("Default", "pre collect=$it")
             delay(1000)
             Log.d("Default", "collect=$it")
         }
 }
What I see backpressure gets buffered up to 64+- and after that it throttles the source (intervalFlow is just a while loop with delay)
this implies the buffer() by default
n
There’s no default buffering in general. Operators that use a channel do this and indicate buffer behavior in their docs.
Like
transformLatest
docs:
This operator is buffered by default and size of its output buffer can be changed by applying subsequent buffer operator.
u
so what is it the causes it in my case? flowOn? also, if they dont then whats th behavior? suspend the source?