galvas
02/05/2024, 10:08 AMgabfssilva
02/22/2024, 10:52 AMgalvas
02/22/2024, 2:59 PMgabfssilva
02/22/2024, 5:10 PMsuspend fun Flow<Int>.sum(): Int {
var acc = 0
collect { acc = acc + it }
return acc
}
The reason you're allowed to do that is due to Flow's cold nature: the collect
function is called one by one, as the elements are produced by upstream.
And it does work well for many other cases:
suspend fun <T> Flow<T>.last(): T {
var last: T? = null
collect { last = it }
return checkNotNull(last)
}
suspend fun <T> Flow<T>.size(): Int {
var size = 0
collect { ++size }
return size
}
But, imagine a scenario a bit more complicated — you have to build a state machine that controls order requests. Each order has four possible states: created, processing, paid and not paid.
You want all the backpressure goodies and stuff, but for each single order, and not globally — you don't want to delay some order due to another one.
While you can use Flows to achieve that, you'll probably rely on some other structure, probably a Mutex, and fast enough your solution won't be that simple anymore.
Actors, on the other hand, are designed for scenarios like this — handling mutable state is one of its core capabilities:
sealed class OrderMessage {
object ProcessOrder : OrderMessage()
data class UpdateOrderStatus(val newStatus: Status) : OrderMessage()
}
fun CoroutineScope.orderActor(initialOrder: Order) = actor<OrderMessage> {
var order = initialOrder // the order itself is mutable
for (msg in channel) {
when (msg) {
is OrderMessage.ProcessOrder -> {
order = order.copy(status = Status.PROCESSING)
println("Order ${order.id} is now ${order.status}")
processOrder(channel) // passing the actor channel itself as reference
}
is OrderMessage.UpdateOrderStatus -> {
order = order.copy(status = msg.newStatus)
println("Order ${order.id} is now ${order.status}")
}
}
}
}
suspend fun processOrder(callback: SendChannel<OrderMessage>) {
val paymentProcessed = (1..2).random() == 1
val newStatus = if (paymentProcessed) Status.PAID else Status.NOT_PAID
launch {
delay(1000) // Simulate async processing
callback.send(OrderMessage.UpdateOrderStatus(newStatus))
} // Send message back to actor
}
It's good to mention, however, that actors are basically channels with the actor model nomenclature. Since the actor package is obsolete in Kotlin, I would recommend for you to stick to Channels, since they can behave in a similar manner with minimal adjustments.galvas
02/23/2024, 9:22 AMgalvas
02/23/2024, 9:24 AMchannel._consumeAsFlow_()
feels semantically correct to me too.