For a given flow, is there a way to emit an initia...
# coroutines
m
For a given flow, is there a way to emit an initial value should the first value not be emitted before some timeout?
My use case for this is when using
combine
where you don’t want an input
flow
to potentially forever hold up the
combine
flow
Copy code
fun <T> Flow<T>.emitInitialValueOnTimeout(
    timeoutMillis: Long,
    initialValue: T
): Flow<T> = flow {
    try {
        withTimeout(timeoutMillis) {
            emit(this@emitInitialValueOnTimeout.first())
        }
        emitAll(this@emitInitialValueOnTimeout.drop(1))
    } catch (e: TimeoutCancellationException) {
        emit(initialValue)
        emitAll(this@emitInitialValueOnTimeout)
    }
}
https://pl.kotl.in/IpOwN6OK3 But the first(), drop(1) doesn’t look safe to me
z
You want to apply this downstream of the
combine
?
Your impl assumes the upstream is a hot flow, which might not be true generally
m
I would apply it to each input of combine.
maybe the receiver flow should be converted to a
SharedFlow
first?
z
This should work: https://pl.kotl.in/K7EUWBvDI
Copy code
fun <T> Flow<T>.emitInitialAfterTimeout(
    timeoutMillis: Long,
    initialValue: T
): Flow<T> = channelFlow {
    var fallbackJob: Job? = launch {
        delay(timeoutMillis)
        send(initialValue)
    }

    collect {
        fallbackJob?.let {
            it.cancelAndJoin()
	        fallbackJob = null
        }
        send(it)
    }
}
m
Yep, yours looks much nicer. Thanks
Suppose I want to reuse the code for something like:
Copy code
fun <T> Flow<T>.emitInitialNullOnTimeout(
    timeoutMillis: Long,
): Flow<T?>
but is there any way to specify a type
R
as
T
or
T?
so that can return
Flow<R>
?
Copy code
fun <T, R: T or T?> Flow<T>.emitInitialValueOnTimeout(
    timeoutMillis: Long,
    initialValue: R,
): Flow<R>
Here’s a nice variation, useful for emitting a cached value if that can be obtained before the first value:
Copy code
fun <T> Flow<T>.emitInitialValueIfBeforeFirst(
    initialValue: suspend () -> T,
): Flow<T> = channelFlow {
    var initialValueJob: Job? = launch {
        val value = initialValue()
        logi("emitting initialValue: $value because first value not yet emitted")
        send(value)
    }
    collect { item ->
        initialValueJob?.let { job ->
            job.cancelAndJoin()
            initialValueJob = null
        }
        send(item)
    }
}
which means can also use it for the earlier extension fun:
Copy code
fun <T> Flow<T>.emitInitialValueOnTimeout(
    timeoutMillis: Long,
    initialValue: T,
): Flow<T> = emitInitialValueIfBeforeFirst {
    delay(timeoutMillis)
    initialValue
}
r
for the nullable case it should infer the nullability already but you could make another extension function for convenience
👍 1
m
Oh yes, of course! Also means probably better to just use
emitInitialAfterTimeout()
directly.