https://kotlinlang.org logo
Title
y

Yevhenii Datsenko

01/05/2021, 12:35 PM
Hello everyone and Happy New Year 🎉 I have a question about SharedFlow. According to documentation
A default implementation of a shared flow that is created with MutableSharedFlow() constructor function without parameters has no replay cache nor additional buffer. emit call to such a shared flow suspends until all subscribers receive the emitted value and returns immediately if there are no subscribers. Thus, tryEmit call succeeds and returns true only if there are no subscribers (in which case the emitted value is immediately lost).
That means that we might lose some emitted values. To tackle this problem I created the following function:
/**
 * This function emits value when the receiver shared flow gets the first subscriber
 * It helps to avoid loosing events while view is not subscribed to shared flow
 */
suspend fun <T> MutableSharedFlow<T>.emitLazily(value: T) {
    subscriptionCount.first { it > 0 }
    emit(value)
}
The question is will the 
first
 function throw an 
NoSuchElementException
 in this case? As far as I understand, it should not do it 😄) Because, the shared flow never completes and we can not convert it into a completing one without adding subscriber. It will be extremely helpful to hear your thoughts))
n

nrobi

01/05/2021, 12:39 PM
You won’t necessarily lose emitted values, you can play around with the
BufferOverflow
, which by default is
.SUSPEND
= if there is no subscriber, then it’ll suspend until someone subscribes (basically your
emitLazily
functionality)
t

Tijl

01/05/2021, 12:48 PM
there is no guarantee subscription will still be >0 by the time you emit, use built in mechanism from SharedFlow like @nrobi suggests (like a buffer of size 1 + SUSPEND)
y

Yevhenii Datsenko

01/05/2021, 12:58 PM
Thanks for your answers. I tried this but it did’t help. Please take a look at this test
runBlockingTest {
    val sharedFlow = MutableSharedFlow<Int>(extraBufferCapacity = 2)
    val receivedValues = mutableListOf<Int>()

    launch {
        sharedFlow.emit(1)
        sharedFlow.emit(2)
    }
    val job = launch { sharedFlow.toList(receivedValues) }

    receivedValues shouldHaveSize 0
    job.cancel()
}
This test passes. Basically, it means that
extraBufferCapacity
works when we have subscriber. Otherwise, shared flow will just skip the items. By the way, I can not use
replay = 1
, because I don’t want to get last value at the next subscription.
t

Tijl

01/05/2021, 1:38 PM
hm yeah this is due to the implementation of emit being
override suspend fun emit(value: T) {
        if (tryEmit(value)) return // fast-path
        emitSuspend(value)
    }
where tryEmit returns true if there are no collector and there is no replay cache
private fun tryEmitNoCollectorsLocked(value: T): Boolean {
        assert { nCollectors == 0 }
        if (replay == 0) return true // no need to replay, just forget it now
you could open an issue with your usecase, maybe a Buffer behaviour could be added for your scenario. not sure how to do it in a safe way now without any replay.
y

Yevhenii Datsenko

01/05/2021, 1:39 PM
okay, thanks a lot for your time) I will create an issue and post the link here)
t

Tijl

01/05/2021, 1:43 PM
do you need the “Shared” aspect of SharedFlow? I feel the reason this is not supported well is because in any scenario with more than one collector it’s a bit esoteric (your emits are essentially processed by ‘once or more’), you could consider just using a Channel if you want to suspend until stuff is processed once (and then still emit to a SharedFlow if you also need to optionally share it)
1
in any case, good luck