dave08
01/09/2020, 1:03 PMFlow
that sends three states Starting
, Progress
and Finished
(from a sealed class
), into a Flow
that assures receiving Starting
and Finished
but conflates Progress
for backpressure on latency of the collector...?diesieben07
01/09/2020, 1:15 PMsealed class State {
object Starting : State()
data class Progress(val progress: Int) : State()
object Finished : State()
}
fun Flow<State>.checkAndConflate(): Flow<State.Progress> {
return flow<State.Progress> {
var first = true
var done = false
collect {
check(!done)
when {
first -> {
check(it is State.Starting)
first = false
}
it is State.Progress -> {
emit(it)
}
else -> {
check(it is State.Finished)
done = true
}
}
}
}.conflate()
}
dave08
01/09/2020, 1:24 PMFlow<State>
as a result, only with the Progress
being conflated... (there's some properties I need on Started
and Finished
...diesieben07
01/09/2020, 1:27 PMfun Flow<State>.checkAndConflate(): Flow<State> {
val conflatedProgressFlow = flow<State> {
var first = true
var done = false
collect {
check(!done)
when {
first -> {
check(it is State.Starting)
first = false
}
it is State.Progress -> {
emit(it)
}
else -> {
check(it is State.Finished)
done = true
}
}
}
}.conflate()
val startFlow = take(1)
val finishedFlow = dropWhile { it !is State.Finished }
return conflatedProgressFlow.onStart { emitAll(startFlow) }.onCompletion { emitAll(finishedFlow) }
}
Not sure if it's the optimal way to do this though...dave08
01/09/2020, 1:36 PMdelay
in the receiver to skip some of the progress, but I need the first and last emissions to display start and finish notification states...diesieben07
01/09/2020, 1:43 PMDominaezzz
01/09/2020, 1:53 PMdave08
01/09/2020, 1:54 PMemit(1)
was skipped over there.. and emit(5)
would have been skipped if it would have some smaller delay...diesieben07
01/09/2020, 2:07 PMfun Flow<State>.checkAndConflate(): Flow<State> {
return channelFlow<State> {
val progressChannel = Channel<State>(Channel.CONFLATED)
launch {
try {
collect {
if (it is State.Progress) {
progressChannel.send(it)
} else {
this@channelFlow.send(it)
}
}
} finally {
progressChannel.close()
this@channelFlow.close()
}
}
progressChannel.consumeEach {
this@channelFlow.send(it)
}
awaitClose()
}.buffer(Channel.RENDEZVOUS)
}
This is what I came up with, but it breaks the sequence of elementsfun Flow<State>.checkAndConflate(): Flow<State> {
return flow<State> {
coroutineScope {
val start = CompletableDeferred<State.Starting>()
val finish = CompletableDeferred<State.Finished>()
val progressChannel = produce<State.Progress>(capacity = Channel.CONFLATED) {
collect {
when (it) {
is State.Starting -> start.complete(it)
is State.Progress -> send(it)
is State.Finished -> finish.complete(it)
}
}
}
emit(start.await())
progressChannel.consumeEach { emit(it) }
emit(finish.await())
}
}
}
dave08
01/09/2020, 2:20 PM...
is State.Progress -> { delay(timoutMillis); send(it) }
...
Dominaezzz
01/09/2020, 2:21 PMdebounceWhen(timeoutMillis) { ... }
operator would be nice.