So I use an `actor` + a `ConflatedBroadcastChannel...
# coroutines
z
So I use an
actor
+ a
ConflatedBroadcastChannel
to safely update state and observe changes to that state. Does this pattern change at all with the introduction of
Flow
?
Copy code
class CountRepository: CoroutineScope by CoroutineScope(Dispatchers.Default) {

    private val stateChannel = ConflatedBroadcastChannel<Int>(0)
    
    private val actor = actor<Int>(capacity = Channel.UNLIMITED) {
        var count = stateChannel.value

        for (message in channel) {
            count += message
            
            stateChannel.send(count)
        }
    }

    fun states(): ReceiveChannel<Int> = stateChannel.openSubscription()

    fun increment() {
        actor.offer(1)
    }

    fun decrement() {
        actor.offer(-1)
    }
}
z
You would probably want to make
fun states()
return a
Flow<Int>
instead of a `ReceiveChannel`:
Copy code
fun states(): Flow<Int> = stateChannel.asFlow()
This is nicer because the caller could forget to cancel the channel, but the Flow will always either complete or get cancelled.
z
ah, so the pattern is basically the same
b
@Zach Klippenstein (he/him) [MOD] One doubt about it, with a
ConflatedBroadcastChannel
I can consume data with
consumeEach
then I can consume a value every time I send a value to it with no need of a new
consume
call or a new subscription opening, can I do the same thing with
Flow
? As I know to consume flow values I need to explicit open a subscription with
collect
, I'm new with channels and flows then probably I'm missing something.
z
consumeEach
just runs a for loop that calls your lambda with every value received from the channel, and then ensures the channel is cancelled afterwards.
Flow.collect
behaves very similarly from the consumers point of view – it also calls your lambda with every value emitted by the flow.
collect
also manages both sides of the subscription though – when you call
collect
in this case, it implicitly calls
openSubscription
, and then cancels that subscription before returning.
b
oh... nice, then following this example if I have a place consuming the
fun states(): Flow<Int> = stateChannel.asFlow()
like
states().collect { println(it) }
and I send a new value to
stateChannel
(
stateChannel.send(count)
) the value will be consumed by that
collect
?
z
Correct.
collect
won’t return until you close
stateChannel
, and if the coroutine from which you’re calling
collect
is cancelled (or your lambda throws an exception), it will cancel the subscription to the
ConflatedBroadcastChannel
before rethrowing the exception.
🙏 1
b
nice! that what I was looking for 🙂 thank you!
👍 1
b
@Zach Klippenstein (he/him) [MOD] Excellent explanations 👏