zak.taccardi
06/12/2019, 2:38 PMactor + a ConflatedBroadcastChannel to safely update state and observe changes to that state. Does this pattern change at all with the introduction of Flow?
class CountRepository: CoroutineScope by CoroutineScope(Dispatchers.Default) {
private val stateChannel = ConflatedBroadcastChannel<Int>(0)
private val actor = actor<Int>(capacity = Channel.UNLIMITED) {
var count = stateChannel.value
for (message in channel) {
count += message
stateChannel.send(count)
}
}
fun states(): ReceiveChannel<Int> = stateChannel.openSubscription()
fun increment() {
actor.offer(1)
}
fun decrement() {
actor.offer(-1)
}
}Zach Klippenstein (he/him) [MOD]
06/12/2019, 4:34 PMfun states() return a Flow<Int> instead of a `ReceiveChannel`:
fun states(): Flow<Int> = stateChannel.asFlow()
This is nicer because the caller could forget to cancel the channel, but the Flow will always either complete or get cancelled.zak.taccardi
06/12/2019, 4:34 PMbloder
06/12/2019, 5:24 PMConflatedBroadcastChannel I can consume data with consumeEach then I can consume a value every time I send a value to it with no need of a new consume call or a new subscription opening, can I do the same thing with Flow? As I know to consume flow values I need to explicit open a subscription with collect, I'm new with channels and flows then probably I'm missing something.Zach Klippenstein (he/him) [MOD]
06/12/2019, 5:28 PMconsumeEach just runs a for loop that calls your lambda with every value received from the channel, and then ensures the channel is cancelled afterwards. Flow.collect behaves very similarly from the consumers point of view – it also calls your lambda with every value emitted by the flow. collect also manages both sides of the subscription though – when you call collect in this case, it implicitly calls openSubscription, and then cancels that subscription before returning.bloder
06/12/2019, 5:35 PMfun states(): Flow<Int> = stateChannel.asFlow() like states().collect { println(it) } and I send a new value to stateChannel (stateChannel.send(count)) the value will be consumed by that collect?Zach Klippenstein (he/him) [MOD]
06/12/2019, 6:11 PMcollect won’t return until you close stateChannel, and if the coroutine from which you’re calling collect is cancelled (or your lambda throws an exception), it will cancel the subscription to the ConflatedBroadcastChannel before rethrowing the exception.bloder
06/12/2019, 6:13 PMbohsen
06/13/2019, 6:24 AM