tseisel
07/16/2019, 12:50 PMFlow
, I am somewhat confused about when I should use Flow
VS `Channel`s and `actor`s.
I know that Flow
represents a cold stream of values, but that stream can also be generated from a hot source, making it a deferrable hot stream.
Do you have some specific use cases where you'd use `Channel`s over Flow
, and why ? It seems to me that Flow
can handle most use-cases.elizarov
07/16/2019, 1:50 PMFlow
. It is more performant and safer to use. There are not that many use-cases for channels that cannot be easily replaced by flow. If flow fits you, then yours is not among those.tseisel
07/16/2019, 2:13 PMFlow
can solve:
Given a Flow<List<String>>
, I'd like to :
1. Diff each list with the previously received one,
2. Perform some actions with the difference,
3. Let multiple clients retrieve the last collected list at a given time, in a thread-safe way (ConflatedBroadcastChannel
?).
I'd like the flow to be collected only when the first client requests a list. Is it possible with Flow
?
I currently have an implementation that uses actors but it looks cumbersome.Zach Klippenstein (he/him) [MOD]
07/16/2019, 4:06 PMval finalFlow = theFlow
.scanReduce { old, new ->
val diff = computeDiff(old, new)
processDiff(diff)
return@scanReduce new
}
For 3, you can use the conflate()
operator, but it also requires multicasting/sharing, for which there is currently no operator but one is proposed in https://github.com/Kotlin/kotlinx.coroutines/issues/1261.tseisel
07/16/2019, 5:56 PMscanReduce
is exactly what I was looking for 🤩
For 3, I thought of creating a ConflatedBroadcastChannel
with broadcast(capacity = Channel.CONFLATED, start = CoroutineStart.LAZY)
that collects from the source `Flow`:
class MyRepository(
private val scope: CoroutineScope,
private val dao: MyDao
) {
private val channel: BroadcastChannel<List<String>> = dao.flow
.scanReduce { original: List<String>, modified: List<String> ->
val (added, deleted) = diffList(original, modified)
println("Added: $added, modified: $deleted")
return@scanReduce modified
}.let { flow ->
scope.broadcast(capacity = Channel.CONFLATED, start = CoroutineStart.LAZY) {
flow.collect { send(it) }
}
}
suspend fun getLatestList(): List<String> = channel.openSubscription().consume { receive() }
}
In case the source flow fails with an exception, calling getLatestList
will rethrow that exception, but subsequent calls to getLatestList
will not re-open the failed BroadcastChannel
, which is something that I'd want. Any idea ?elizarov
07/16/2019, 6:33 PMflow.broadcastIn(scope)
to simplify your code