KotlinLeaner
01/05/2024, 3:33 PMSharedFlows
and I'm trying to get only the latest value emitted by the flow. I've tried using the last()
operator, but it's not working as expected.
main
fun main(): Unit = runBlocking {
val eventBus = EventBus()
launch {
val latestValue = eventBus.value.lastOrNull()
if (latestValue != null) {
println("Latest value: $latestValue")
}
}
launch {
repeat(5) {
eventBus.increment(it)
}
}
launch {
delay(5.seconds)
(5..10).forEach {
eventBus.increment(it)
}
}
}
KotlinLeaner
01/05/2024, 3:33 PMclass EventBus {
private val _value = MutableSharedFlow<Int>()
val value: SharedFlow<Int> = _value.asSharedFlow()
suspend fun increment(number: Int) {
_value.emit(number)
}
}
Explanation
I want to capture only the latest value emitted by the observable. For instance, if the observable emits values from 0 to 4, and then pauses for a while, followed by values from 5 to 10, I want to retain only the final value for each interval. For the initial interval (0 to 4), I want to store 4, and for the second interval (5 to 10), I want to store 10. Is this achievable?
ThanksPeter Farlow
01/05/2024, 3:48 PMpublic interface SharedFlow<out T> : Flow<T> {
public val replayCache: List<T>
override suspend fun collect(collector: FlowCollector<T>): Nothing
}
you might notice that SharedFlow overrides Flow to change the collect
function. In the base Flow
interface, collect
returns Unit:
public suspend fun collect(collector: FlowCollector<T>)
However, because a SharedFlow never completes, it changes the signature to return Nothing
because the collect function on a SharedFlow never returns. This is why called last()
or lastOrNull()
doesn’t work as expected on a SharedFlow. last()
tries to collect the entire Flow, and when it completes, returns the final value. As stated before, SharedFlow does not complete ever.
Instead, you can use the replayCache
property of the SharedFlow API to get the most recently emitted value, assuming the SharedFlow was created with a replay value of at least 1. In your code sample, you’d want to change value
to:
private val _value = MutableSharedFlow<Int>(replay = 1)
and then you could access it in your main function with:
val latestValue = eventBus.value.replayCache.firstOrNull()
Peter Farlow
01/05/2024, 3:50 PMPeter Farlow
01/05/2024, 3:50 PMKotlinLeaner
01/05/2024, 3:55 PMfun main(): Unit = runBlocking {
val eventBus = EventBus()
launch {
val latestValue = eventBus.value.replayCache.firstOrNull()
if (latestValue != null) {
println("Latest value: $latestValue")
}
}
launch {
repeat(5) {
eventBus.increment(it)
}
}
launch {
delay(5.seconds)
(5..10).forEach {
delay(100)
eventBus.increment(it)
}
}
}
class EventBus {
private val _value = MutableSharedFlow<Int>(replay = 1)
val value: SharedFlow<Int> = _value.asSharedFlow()
suspend fun increment(number: Int) {
_value.emit(number)
}
}
Peter Farlow
01/05/2024, 3:57 PMKotlinLeaner
01/05/2024, 4:00 PMPeter Farlow
01/05/2024, 4:37 PM