I've got a stream of items that need to be process...
# coroutines
r
I've got a stream of items that need to be processed sequentially, but I'd like to speed things up by breaking up and parallelizing the processing steps. Currently I'm explicitly passing each item to the next "step" via an actor.
Copy code
val step3Channel = actor { /*processing..*/ println("finished!") }
val step2Channel = actor { /*processing..*/ step3Channel.send(result) }
val step1Channel = actor { /*processing..*/ step2Channel.send(result) }
This becomes cumbersome when the item passed to each channel has a lot of fields and there's no way to "short-circuit" and skip any of the steps. Is there a better way of doing this?
k
What type is the “stream of items”?
Flow
?
List
?
If Flow, something like this could work:
Copy code
private fun <T, R> Flow<T>.blah(mapping: suspend (T) -> R): Flow<Deferred<R>> {
    return channelFlow {
        this@blah.collect { item ->
            val itemMapping = async { mapping(item) }
            send(itemMapping)
        }
    }.buffer()
}
r
it's a channel, I'm receiving items from a broadcaster receiver
Copy code
override fun onReceive(context: Context, broadcastIntent: Intent?) {
    ..
    channel.trySend(item)
}
k
Copy code
channel.consumeAsFlow()
r
so a
Flow<Deferred<R>>
?
k
Yes, that will concurrently process elements from
(T) -> R
and you can await them as you desire in the original order they were sent in
r
oh I think
buffer()
is the real magic here though, I hadn't considered that operator yet.
wow this is a lifesaver, thanks!
k
If you want to limit the concurrency of
mapping
, you should use a semaphor.
Copy code
private fun <T, R> Flow<T>.blah(maxConcurrency: Int, mapping: suspend (T) -> R): Flow<Deferred<R>> {
    val semaphor = Semaphor(maxConcurrency)
    return channelFlow {
        this@blah.collect { item ->
            val itemMapping = async { 
                semaphor.withPermit { mapping(item) } 
            }
            send(itemMapping)
        }
    }.buffer()
}
👍 1
No problem!