reline
04/18/2024, 4:41 PMval step3Channel = actor { /*processing..*/ println("finished!") }
val step2Channel = actor { /*processing..*/ step3Channel.send(result) }
val step1Channel = actor { /*processing..*/ step2Channel.send(result) }
This becomes cumbersome when the item passed to each channel has a lot of fields and there's no way to "short-circuit" and skip any of the steps.
Is there a better way of doing this?kevin.cianfarini
04/18/2024, 4:44 PMFlow
? List
?kevin.cianfarini
04/18/2024, 4:50 PMprivate fun <T, R> Flow<T>.blah(mapping: suspend (T) -> R): Flow<Deferred<R>> {
return channelFlow {
this@blah.collect { item ->
val itemMapping = async { mapping(item) }
send(itemMapping)
}
}.buffer()
}
reline
04/18/2024, 4:51 PMoverride fun onReceive(context: Context, broadcastIntent: Intent?) {
..
channel.trySend(item)
}
kevin.cianfarini
04/18/2024, 4:52 PMchannel.consumeAsFlow()
reline
04/18/2024, 4:54 PMFlow<Deferred<R>>
?kevin.cianfarini
04/18/2024, 4:57 PM(T) -> R
and you can await them as you desire in the original order they were sent inreline
04/18/2024, 4:57 PMbuffer()
is the real magic here though, I hadn't considered that operator yet.reline
04/18/2024, 4:58 PMkevin.cianfarini
04/18/2024, 4:59 PMmapping
, you should use a semaphor.
private fun <T, R> Flow<T>.blah(maxConcurrency: Int, mapping: suspend (T) -> R): Flow<Deferred<R>> {
val semaphor = Semaphor(maxConcurrency)
return channelFlow {
this@blah.collect { item ->
val itemMapping = async {
semaphor.withPermit { mapping(item) }
}
send(itemMapping)
}
}.buffer()
}
kevin.cianfarini
04/18/2024, 4:59 PM