Hi, I need to insert a initial value to a stream t...
# coroutines
u
Hi, I need to insert a initial value to a stream that sometimes doesn't emit, however if it emits within a window of time, take the real value, not the manual initial one TLDR; debounce first value - I have this
Copy code
private fun <T> Flow<T>.onStartDebounced(initialValue: T, debounceMillis: Long): Flow<T> {
    return onStart { emit(initialValue) }
        .withIndex()
        .debounce { if (it.index == 0) debounceMillis else 0L }
        .map { it.value }
}
Works, but im a Rx guy, is there something more idiomatic / performant (so I can actually learn something)?
b
How would you write this logic in Rx? Once you have that, check out the
Migration.kt
file for some insight on how to migrate it to
Flow
. That's normally helped some of my teammates when I work with them on getting something new as a Flow. Just remember,
Single
and
Maybe
are
suspend
functions now.
e
it's not more performant, but my first thought would be to stitch together a few other primitives,
Copy code
fun <T> Flow<T>.onStartDebounced(
    initialValue: T,
    debounceMillis: Long
) = channelFlow<T> {
    val job = launch {
        delay(debounceMillis)
        send(initialValue)
    }
    collect {
        job.cancel()
        send(it)
    }
}
u
@ephemient yea something like that, thanks
e
something like this would have less overhead than that
Copy code
fun <T> Flow<T>.onStartDebounced(
    initialValue: T,
    debounceMillis: Long
) = flow<T> {
    var hasValue = false
    emitAll(
        merge(
            onEach { hasValue = true },
            flowOf(initialValue)
                .onStart { delay(debounceMillis) }
                .filter { !hasValue }
        )
    )
}
I'd prefer either of these just due to less thinking required to determine that it's working right :)
u
the first is less performant becuase of the launch?
e
mostly because it requires bouncing through a channel
u
arent channels primitives backing all Flows?
e
most flows are probably cold and they don't involve channels
u
I see, i like both your samples. Btw is it safe to have hasValue as a plain boolean, not atomic one? (also it will continue to delay even when left side already emitted)
e
I believe it is safe because collection can only happen sequentially. yes, it will continue to delay even if the left side has emitted, so if you're creating millions of short-lived flows that could be an issue.
u
and because of that it needs to be atomic i believe. im going to go with first one, thanks!