zak.taccardi
06/12/2019, 2:38 PMactor
+ a ConflatedBroadcastChannel
to safely update state and observe changes to that state. Does this pattern change at all with the introduction of Flow
?
class CountRepository: CoroutineScope by CoroutineScope(Dispatchers.Default) {
private val stateChannel = ConflatedBroadcastChannel<Int>(0)
private val actor = actor<Int>(capacity = Channel.UNLIMITED) {
var count = stateChannel.value
for (message in channel) {
count += message
stateChannel.send(count)
}
}
fun states(): ReceiveChannel<Int> = stateChannel.openSubscription()
fun increment() {
actor.offer(1)
}
fun decrement() {
actor.offer(-1)
}
}
Zach Klippenstein (he/him) [MOD]
06/12/2019, 4:34 PMfun states()
return a Flow<Int>
instead of a `ReceiveChannel`:
fun states(): Flow<Int> = stateChannel.asFlow()
This is nicer because the caller could forget to cancel the channel, but the Flow will always either complete or get cancelled.zak.taccardi
06/12/2019, 4:34 PMbloder
06/12/2019, 5:24 PMConflatedBroadcastChannel
I can consume data with consumeEach
then I can consume a value every time I send a value to it with no need of a new consume
call or a new subscription opening, can I do the same thing with Flow
? As I know to consume flow values I need to explicit open a subscription with collect
, I'm new with channels and flows then probably I'm missing something.Zach Klippenstein (he/him) [MOD]
06/12/2019, 5:28 PMconsumeEach
just runs a for loop that calls your lambda with every value received from the channel, and then ensures the channel is cancelled afterwards. Flow.collect
behaves very similarly from the consumers point of view – it also calls your lambda with every value emitted by the flow. collect
also manages both sides of the subscription though – when you call collect
in this case, it implicitly calls openSubscription
, and then cancels that subscription before returning.bloder
06/12/2019, 5:35 PMfun states(): Flow<Int> = stateChannel.asFlow()
like states().collect { println(it) }
and I send a new value to stateChannel
(stateChannel.send(count)
) the value will be consumed by that collect
?Zach Klippenstein (he/him) [MOD]
06/12/2019, 6:11 PMcollect
won’t return until you close stateChannel
, and if the coroutine from which you’re calling collect
is cancelled (or your lambda throws an exception), it will cancel the subscription to the ConflatedBroadcastChannel
before rethrowing the exception.bloder
06/12/2019, 6:13 PMbohsen
06/13/2019, 6:24 AM