Since the inception of `Flow`, I am somewhat confu...
# coroutines
t
Since the inception of
Flow
, I am somewhat confused about when I should use
Flow
VS `Channel`s and `actor`s. I know that
Flow
represents a cold stream of values, but that stream can also be generated from a hot source, making it a deferrable hot stream. Do you have some specific use cases where you'd use `Channel`s over
Flow
, and why ? It seems to me that
Flow
can handle most use-cases.
e
If it handles your use-cases, then use
Flow
. It is more performant and safer to use. There are not that many use-cases for channels that cannot be easily replaced by flow. If flow fits you, then yours is not among those.
👍 1
t
I have a use case I'm not sure
Flow
can solve: Given a
Flow<List<String>>
, I'd like to : 1. Diff each list with the previously received one, 2. Perform some actions with the difference, 3. Let multiple clients retrieve the last collected list at a given time, in a thread-safe way (
ConflatedBroadcastChannel
?). I'd like the flow to be collected only when the first client requests a list. Is it possible with
Flow
? I currently have an implementation that uses actors but it looks cumbersome.
z
1 & 2 can be accomplished with `scanReduce`:
Copy code
val finalFlow = theFlow
    .scanReduce { old, new ->
      val diff = computeDiff(old, new)
      processDiff(diff)
      return@scanReduce new
    }
For 3, you can use the
conflate()
operator, but it also requires multicasting/sharing, for which there is currently no operator but one is proposed in https://github.com/Kotlin/kotlinx.coroutines/issues/1261.
👍 1
I’m not exactly sure how you would implement lazy start with this share operator, would depend if it saves the cached value between upstream collections or not.
t
Thanks @Zach Klippenstein (he/him) [MOD],
scanReduce
is exactly what I was looking for 🤩 For 3, I thought of creating a
ConflatedBroadcastChannel
with
broadcast(capacity = Channel.CONFLATED, start = CoroutineStart.LAZY)
that collects from the source `Flow`:
Copy code
class MyRepository(
    private val scope: CoroutineScope,
    private val dao: MyDao
) {
    private val channel: BroadcastChannel<List<String>> = dao.flow
        .scanReduce { original: List<String>, modified: List<String> ->
            val (added, deleted) = diffList(original, modified)
            println("Added: $added, modified: $deleted")
            return@scanReduce modified
        }.let { flow ->
            scope.broadcast(capacity = Channel.CONFLATED, start = CoroutineStart.LAZY) {
                flow.collect { send(it) }
            }
        }

    suspend fun getLatestList(): List<String> = channel.openSubscription().consume { receive() }
}
In case the source flow fails with an exception, calling
getLatestList
will rethrow that exception, but subsequent calls to
getLatestList
will not re-open the failed
BroadcastChannel
, which is something that I'd want. Any idea ?
e
You can also use
flow.broadcastIn(scope)
to simplify your code
👍 1