Alexander Maryanovsky
03/29/2022, 8:06 AMprivate sealed interface FlowMessage<out T>
private data class FlowValue<T>(val value: T): FlowMessage<T>
private object EndOfFlow: FlowMessage<Nothing>
fun <T> Flow<T>.shareCompletableIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
) = this
.map<T, FlowMessage<T>> { FlowValue(it) }
.onCompletion { emit(EndOfFlow) }
.shareIn(
scope = scope,
started = started,
replay = replay
)
.takeWhile { it !is EndOfFlow }
.filterIsInstance<FlowValue<T>>()
.map { it.value }
replay
. On the one hand, I need replay+1
if the upstream completed, in order to replay the values + EnfOfFlow
but on the other hand, if the upstream hasn’t completed, I don’t want to replay one extra element.