I’m in process of migration some parts of my codeb...
# coroutines
o
I’m in process of migration some parts of my codebase from Rx to coroutines and I wonder if I could get the similar behavior to
Copy code
private val subject = BehaviorSubject.create<Int>().startWith(Observale.just(1))
but with
SharedFlow
for example? My use case is that I want to trigger specific function(in case of coroutines
suspend
function) only once during creation of
SharedFlow
or after first collector started collecting this flow. I cannot use
onStart
or
onSubscription
because there are more than 1 collector for this flow and thus, my specific function will be triggered for every collector. So as a result a want to end up with something like
Copy code
private val sharedFlow = MutableSharedFlow<Int>(replay = 1).startWith {
  // some suspending call that will be called only once
}
and also get
MutableSharedFlow
type for
sharedFlow
variable to be able to call
emit
/`tryEmit` for modification of cached value so all collectors get notified it changed. Is it possible to implement this behavior?
j
My workaround:
Copy code
fun <T> MutableSharedFlow<T>.startWith(
    action: suspend FlowCollector<T>.() -> Unit
) = object : MutableSharedFlow<T> {

    private val isFirstEmission = AtomicBoolean(true)
    override val replayCache: List<T> = this@startWith.replayCache
    override val subscriptionCount: StateFlow<Int> = this@startWith.subscriptionCount

    override suspend fun collect(collector: FlowCollector<T>) {
        if (isFirstEmission.compareAndSet(true, false)) {
            collector.action()
        }
        this@startWith.collect(collector)
    }

    override suspend fun emit(value: T) {
        this@startWith.emit(value)
    }

    override fun resetReplayCache() {
        this@startWith.resetReplayCache()
    }

    override fun tryEmit(value: T): Boolean = this@startWith.tryEmit(value)
}
maybe helps you