Basically I want 1 single consumer dictating the b...
# flow
g
Basically I want 1 single consumer dictating the backpressure, and potentially multiple producers adding events to it. I was looking at the Actor builder but it I'm not sure if that's the best fit or why it's obsolete.
g
If I understood correctly, you’re looking for Channels. Actors are a good fit if you’re planning to handle mutable state inside the consumer, otherwise I would stick with Channels. SharedFlows is useful for multiple consumers, not multiple producers.
👍 1
g
Can you clarify a bit more what you mean with mutable state inside the de consumer?
g
Sure! Working with Flows often encourages the management of immutable state — although Flows allow some level of mutable code in specific cases, they don't provide features to manage state out-of-the-box. What do I mean by those specific cases? Let me exemplify — image a Flow that sums all integers. Independently of how many producers the Flow has, you can do something like this:
Copy code
suspend fun Flow<Int>.sum(): Int {
    var acc = 0
    collect { acc = acc + it }
    return acc
}
The reason you're allowed to do that is due to Flow's cold nature: the
collect
function is called one by one, as the elements are produced by upstream. And it does work well for many other cases:
Copy code
suspend fun <T> Flow<T>.last(): T {
    var last: T? = null
    collect { last = it }
    return checkNotNull(last)
}

suspend fun <T> Flow<T>.size(): Int {
    var size = 0
    collect { ++size }
    return size
}
But, imagine a scenario a bit more complicated — you have to build a state machine that controls order requests. Each order has four possible states: created, processing, paid and not paid. You want all the backpressure goodies and stuff, but for each single order, and not globally — you don't want to delay some order due to another one. While you can use Flows to achieve that, you'll probably rely on some other structure, probably a Mutex, and fast enough your solution won't be that simple anymore. Actors, on the other hand, are designed for scenarios like this — handling mutable state is one of its core capabilities:
Copy code
sealed class OrderMessage {
    object ProcessOrder : OrderMessage()
    data class UpdateOrderStatus(val newStatus: Status) : OrderMessage()
}

fun CoroutineScope.orderActor(initialOrder: Order) = actor<OrderMessage> {
    var order = initialOrder // the order itself is mutable

    for (msg in channel) {
        when (msg) {
            is OrderMessage.ProcessOrder -> {
                order = order.copy(status = Status.PROCESSING)
                println("Order ${order.id} is now ${order.status}")
                processOrder(channel) // passing the actor channel itself as reference
            }
            is OrderMessage.UpdateOrderStatus -> {
                order = order.copy(status = msg.newStatus)
                println("Order ${order.id} is now ${order.status}")
            }
        }
    }
}

suspend fun processOrder(callback: SendChannel<OrderMessage>) {
    val paymentProcessed = (1..2).random() == 1
    val newStatus = if (paymentProcessed) Status.PAID else Status.NOT_PAID
    launch { 
        delay(1000) // Simulate async processing
        callback.send(OrderMessage.UpdateOrderStatus(newStatus))
    } // Send message back to actor
}
It's good to mention, however, that actors are basically channels with the actor model nomenclature. Since the actor package is obsolete in Kotlin, I would recommend for you to stick to Channels, since they can behave in a similar manner with minimal adjustments.
g
Ah yeah, this is basically what I was looking into because the Actor is a State Machine, so it's internal state would be mutable by the handling of the Channel events. But like you said, this is just a wrapper on the regular Channel behaviour. Thx!
But note that the events themselves are not mutable, so something like
channel._consumeAsFlow_()
feels semantically correct to me too.