Yevhenii Datsenko
01/05/2021, 12:35 PMA default implementation of a shared flow that is created with MutableSharedFlow() constructor function without parameters has no replay cache nor additional buffer. emit call to such a shared flow suspends until all subscribers receive the emitted value and returns immediately if there are no subscribers. Thus, tryEmit call succeeds and returns true only if there are no subscribers (in which case the emitted value is immediately lost).That means that we might lose some emitted values. To tackle this problem I created the following function:
/**
* This function emits value when the receiver shared flow gets the first subscriber
* It helps to avoid loosing events while view is not subscribed to shared flow
*/
suspend fun <T> MutableSharedFlow<T>.emitLazily(value: T) {
subscriptionCount.first { it > 0 }
emit(value)
}
The question is will the first
function throw an NoSuchElementException
in this case? As far as I understand, it should not do it 😄) Because, the shared flow never completes and we can not convert it into a completing one without adding subscriber.
It will be extremely helpful to hear your thoughts))nrobi
01/05/2021, 12:39 PMBufferOverflow
, which by default is .SUSPEND
= if there is no subscriber, then it’ll suspend until someone subscribes (basically your emitLazily
functionality)Tijl
01/05/2021, 12:48 PMYevhenii Datsenko
01/05/2021, 12:58 PMrunBlockingTest {
val sharedFlow = MutableSharedFlow<Int>(extraBufferCapacity = 2)
val receivedValues = mutableListOf<Int>()
launch {
sharedFlow.emit(1)
sharedFlow.emit(2)
}
val job = launch { sharedFlow.toList(receivedValues) }
receivedValues shouldHaveSize 0
job.cancel()
}
This test passes. Basically, it means that extraBufferCapacity
works when we have subscriber. Otherwise, shared flow will just skip the items.
By the way, I can not use replay = 1
, because I don’t want to get last value at the next subscription.Tijl
01/05/2021, 1:38 PMoverride suspend fun emit(value: T) {
if (tryEmit(value)) return // fast-path
emitSuspend(value)
}
where tryEmit returns true if there are no collector and there is no replay cache
private fun tryEmitNoCollectorsLocked(value: T): Boolean {
assert { nCollectors == 0 }
if (replay == 0) return true // no need to replay, just forget it now
you could open an issue with your usecase, maybe a Buffer behaviour could be added for your scenario.
not sure how to do it in a safe way now without any replay.Yevhenii Datsenko
01/05/2021, 1:39 PMTijl
01/05/2021, 1:43 PM