I'm working with Kotlin `SharedFlows` and I'm tryi...
# flow
k
I'm working with Kotlin
SharedFlows
and I'm trying to get only the latest value emitted by the flow. I've tried using the
last()
operator, but it's not working as expected. main
Copy code
fun main(): Unit = runBlocking {
    val eventBus = EventBus()
    launch {
        val latestValue = eventBus.value.lastOrNull()
        if (latestValue != null) {
            println("Latest value: $latestValue")
        }
    }
    launch {
        repeat(5) {
            eventBus.increment(it)
        }
    }
    launch {
        delay(5.seconds)
        (5..10).forEach {
            eventBus.increment(it)
        }
    }
}
EventBus
Copy code
class EventBus {
    private val _value = MutableSharedFlow<Int>()
    val value: SharedFlow<Int> = _value.asSharedFlow()

    suspend fun increment(number: Int) {
        _value.emit(number)
    }
}
Explanation I want to capture only the latest value emitted by the observable. For instance, if the observable emits values from 0 to 4, and then pauses for a while, followed by values from 5 to 10, I want to retain only the final value for each interval. For the initial interval (0 to 4), I want to store 4, and for the second interval (5 to 10), I want to store 10. Is this achievable? Thanks
p
This is the entire public API for SharedFlow:
Copy code
public interface SharedFlow<out T> : Flow<T> {
     
    public val replayCache: List<T>

    override suspend fun collect(collector: FlowCollector<T>): Nothing
}
you might notice that SharedFlow overrides Flow to change the
collect
function. In the base
Flow
interface,
collect
returns Unit:
Copy code
public suspend fun collect(collector: FlowCollector<T>)
However, because a SharedFlow never completes, it changes the signature to return
Nothing
because the collect function on a SharedFlow never returns. This is why called
last()
or
lastOrNull()
doesn’t work as expected on a SharedFlow.
last()
tries to collect the entire Flow, and when it completes, returns the final value. As stated before, SharedFlow does not complete ever. Instead, you can use the
replayCache
property of the SharedFlow API to get the most recently emitted value, assuming the SharedFlow was created with a replay value of at least 1. In your code sample, you’d want to change
value
to:
Copy code
private val _value = MutableSharedFlow<Int>(replay = 1)
and then you could access it in your main function with:
Copy code
val latestValue = eventBus.value.replayCache.firstOrNull()
What I don’t remember off the top of my head is, if your SharedFlow is configured with a replay of greater than 1, if the more recent values are at the start of the replayCache list or at the end of the list
but if the replay is 1, it won’t matter
k
Thanks Peter for great explanation. I followed your suggestion and update my code. It still nothing printing in the console.
Copy code
fun main(): Unit = runBlocking {
    val eventBus = EventBus()
    launch {
        val latestValue = eventBus.value.replayCache.firstOrNull()
        if (latestValue != null) {
            println("Latest value: $latestValue")
        }
    }
    launch {
        repeat(5) {
            eventBus.increment(it)
        }
    }
    launch {
        delay(5.seconds)
        (5..10).forEach {
            delay(100)
            eventBus.increment(it)
        }
    }
}

class EventBus {
    private val _value = MutableSharedFlow<Int>(replay = 1)
    val value: SharedFlow<Int> = _value.asSharedFlow()

    suspend fun increment(number: Int) {
        _value.emit(number)
    }
}
p
What value do you want it to print? Are you trying to make such that it would print 10?
k
If the observable emits values 0 to 4 and then pauses, followed by values 5 to 10, I want to save only the last value in each interval. For the first interval (0 to 4), I want to save 4, and for the second interval (5 to 10), I want to save 10.
p
outside of lastOrNull vs replayCache, you’d need to change your code accomplish that. The program as written is only ever going to print a single value, not multiple values. You could try collecting the SharedFlow and the debounce operator but I don’t know if this is a specific problem you’re trying to solve
gratitude thank you 1