How can I transform a `Flow` that sends three stat...
# coroutines
d
How can I transform a
Flow
that sends three states
Starting
,
Progress
and
Finished
(from a
sealed class
), into a
Flow
that assures receiving
Starting
and
Finished
but conflates
Progress
for backpressure on latency of the collector...?
d
Copy code
sealed class State {
    object Starting : State()

    data class Progress(val progress: Int) : State()

    object Finished : State()
}

fun Flow<State>.checkAndConflate(): Flow<State.Progress> {
    return flow<State.Progress> {
        var first = true
        var done = false
        collect {
            check(!done)
            when {
                first -> {
                    check(it is State.Starting)
                    first = false
                }
                it is State.Progress -> {
                    emit(it)
                }
                else -> {
                    check(it is State.Finished)
                    done = true
                }
            }
        }
    }.conflate()
}
d
Thanks! But I really need it to be
Flow<State>
as a result, only with the
Progress
being conflated... (there's some properties I need on
Started
and
Finished
...
d
well, should be easy to adapt my code to just re-emit those items as well
Ah, actually, not really. give me one second.
Copy code
fun Flow<State>.checkAndConflate(): Flow<State> {
    val conflatedProgressFlow = flow<State> {
        var first = true
        var done = false
        collect {
            check(!done)
            when {
                first -> {
                    check(it is State.Starting)
                    first = false
                }
                it is State.Progress -> {
                    emit(it)
                }
                else -> {
                    check(it is State.Finished)
                    done = true
                }
            }
        }
    }.conflate()

    val startFlow = take(1)
    val finishedFlow = dropWhile { it !is State.Finished }

    return conflatedProgressFlow.onStart { emitAll(startFlow) }.onCompletion { emitAll(finishedFlow) }
}
Not sure if it's the optimal way to do this though...
Probably not
Scratch that, definitely not optimal, because it runs the flow three times
d
Yup... this problem is a result of Android's Notifications not being designed to receive too many updates... I really have to use
delay
in the receiver to skip some of the progress, but I need the first and last emissions to display start and finish notification states...
So far, I had to use broadcastIn and filter on multiple subscribers launched on different coroutines to get this to work... not too much cleaner than your solution 🙈...
Thanks for the attempt though 😉!
d
I have another solution, one second 😉
d
Only for the progress emissions... but the first and last ones shouldn't be debounced...
emit(1)
was skipped over there.. and
emit(5)
would have been skipped if it would have some smaller delay...
d
Copy code
fun Flow<State>.checkAndConflate(): Flow<State> {
    return channelFlow<State> {
        val progressChannel = Channel<State>(Channel.CONFLATED)
        launch {
            try {
                collect {
                    if (it is State.Progress) {
                        progressChannel.send(it)
                    } else {
                        this@channelFlow.send(it)
                    }
                }
            } finally {
                progressChannel.close()
                this@channelFlow.close()
            }
        }
        progressChannel.consumeEach {
           this@channelFlow.send(it)
        }

        awaitClose()
    }.buffer(Channel.RENDEZVOUS)
}
This is what I came up with, but it breaks the sequence of elements
Revised again and it works now:
Copy code
fun Flow<State>.checkAndConflate(): Flow<State> {
    return flow<State> {
        coroutineScope {
            val start = CompletableDeferred<State.Starting>()
            val finish = CompletableDeferred<State.Finished>()

            val progressChannel = produce<State.Progress>(capacity = Channel.CONFLATED) {
                collect {
                    when (it) {
                        is State.Starting -> start.complete(it)
                        is State.Progress -> send(it)
                        is State.Finished -> finish.complete(it)
                    }
                }
            }

            emit(start.await())
            progressChannel.consumeEach { emit(it) }
            emit(finish.await())
        }
    }
}
d
That's not bad 👌🏼, so to debounce the progress I would just add :
Copy code
...
is State.Progress -> { delay(timoutMillis); send(it) }
...
Thanks!
d
I guess a
debounceWhen(timeoutMillis) { ... }
operator would be nice.
👍🏼 3