ursus
01/01/2022, 3:38 PMbuffer
or conflate
• Hot streams default behavior varies
• MutableSharedFlow is suspend by default but can modify this behavior in the ctor
• MutableStateFlow is conflated by default and you CANNOT modify this behavior
• In both hot streams, downstream CANNOT modify this behavior (i.e. buffer or conflate dont do anything)
Everything correct?
(Most importantly whether downstream is able buffer on backpressure from hot upstream)ursus
01/01/2022, 6:52 PMval sharedFlow = MutableSharedFlow<Int>()
scope.launch {
intervalFlow(100)
.collect {
sharedFlow.emit(it)
}
}
scope.launch {
sharedFlow
.buffer(capacity = Channel.UNLIMITED)
.collect {
Log.d("Default", "pre collect=$it")
delay(1000)
Log.d("Default", "collect=$it")
}
}
and it works as expected, i.e. downstream operator controls the backpressure; the unlimited buffer works, collector doesn't miss anything. buffer(capacity = Channel.CONFLATE
works as well as expected
Whats the MutableSharedFlow(onBufferOverflow)
then for?ursus
01/01/2022, 6:58 PMval stateFlow = MutableStateFlow<Int>(0)
scope.launch {
intervalFlow(100)
.collect {
stateFlow.value = it
}
}
scope.launch {
stateFlow
.buffer(capacity = UNLIMITED)
.collect {
Log.d("Default", "pre collect=$it")
delay(1000)
Log.d("Default", "collect=$it")
}
}
I notice that MutableStateFlow is conflated on backpressure by default,but if I set a buffer of unlimited, then downstream sees everything
Which I don't understand why it works, as the other day I had the issue of exactly which, UI being busy, missing stateflow emit somehow, so I added buffer
operator and it did nothing; emit got lostursus
01/01/2022, 7:03 PMintervalFlow
to (0..200).asFlow()
then I only see the last value inthe second collector, and buffer
does nothing.. why?Nick Allen
01/01/2022, 8:35 PMbuffer(capacity = Buffer.UNLIMITED)
you are creating a Channel
-based Flow
that listens to the SharedFlow
. When the SharedFlow
tries to emit to subscribers, this this subscriber accepts the item immediately and adds it to its infinite buffer. The `SharedFlow`'s buffer never comes into play at all.Nick Allen
01/01/2022, 8:35 PMSharedFlow
docs:
The buffer space determines how much slow subscribers can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved using theparameter.extraBufferCapacity
Nick Allen
01/01/2022, 8:41 PM• In both hot streams, downstream CANNOT modify this behavior (i.e. buffer or conflate dont do anything)
This isn't quite correct. Such operators cannot affect the `SharedFlow`'s internal buffer capacity, but they do create buffers that affect the subscribing `Flow`'s behavior.
Nick Allen
01/01/2022, 8:50 PMStateFlow
before you even subscribe.ursus
01/01/2022, 9:22 PMbuffer
to each operator, rght?ursus
01/01/2022, 9:23 PMNick Allen
01/01/2022, 9:34 PMursus
01/01/2022, 9:48 PMscope.launch {
intervalFlow(100ms)
.flowOn(<http://Dispatchers.Io|Dispatchers.Io>)
.collect {
Log.d("Default", "pre collect=$it")
delay(1000)
Log.d("Default", "collect=$it")
}
}
What I see backpressure gets buffered up to 64+- and after that it throttles the source
(intervalFlow is just a while loop with delay)ursus
01/01/2022, 10:02 PMNick Allen
01/02/2022, 1:07 AMNick Allen
01/02/2022, 1:08 AMtransformLatest
docs:
This operator is buffered by default and size of its output buffer can be changed by applying subsequent buffer operator.
ursus
01/02/2022, 1:19 AM