According to the docs, when the upstream of a Shar...
# flow
a
According to the docs, when the upstream of a SharedFlow completes, the downstream just remains stuck collecting forever. What do you guys think of this utility to work around this? Code in thread.
Copy code
private sealed interface FlowMessage<out T>
private data class FlowValue<T>(val value: T): FlowMessage<T>
private object EndOfFlow: FlowMessage<Nothing>

fun <T> Flow<T>.shareCompletableIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
) = this
    .map<T, FlowMessage<T>> { FlowValue(it) }
    .onCompletion { emit(EndOfFlow) }
    .shareIn(
        scope = scope,
        started = started,
        replay = replay
    )
    .takeWhile { it !is EndOfFlow }
    .filterIsInstance<FlowValue<T>>()
    .map { it.value }
I think I’ll have a problem with
replay
. On the one hand, I need
replay+1
if the upstream completed, in order to replay the values +
EnfOfFlow
but on the other hand, if the upstream hasn’t completed, I don’t want to replay one extra element.