expensivebelly
07/21/2021, 11:17 AMObservable.concatArrayEager(...)
in Flow?Nick Allen
08/02/2021, 11:00 PMFlow
"eager" with the shareIn
operator. I think this would work:
fun <T> concatEager(scope: CoroutineScope, vararg flows: Flow<T>): Flow<T> =
flows.asFlow()
.flatMapConcat { it.shareIn(scope, SharingStarted.Eagerly, Int.MAX_VALUE) }
expensivebelly
08/07/2021, 9:25 AMNick Allen
08/09/2021, 6:52 PMshareIn
returns a flow that never completes so later flows are never evaluated and flatMapConcat doesn't evaluate its lambda until it's done with earlier flows.
If you want currency, then you'll need to build your own, I think. This example uses channels and a semaphore:
fun <T> concatEager(vararg flows: Flow<T>, concurrency: Int = Int.MAX_VALUE): Flow<T> = channelFlow<T> {
val semaphore = Semaphore(concurrency)
val channels = flows.map { f ->
Channel<T>(Channel.UNLIMITED).also { c ->
//UNDISPATCHED ensures correct order of withPermit calls
launch(start = CoroutineStart.UNDISPATCHED) {
semaphore.withPermit {
f.collect { c.send(it) }
c.close()
}
}
}
}
for(channel in channels) {
for (item in channel) {
send(item)
}
}
}
If you wanted to add a "combined" buffer size like the Rx operator, that'd have to be built in too with an atomic counter. If ok with a per-flow capacity, then that'd be easy to just pass into the Channel constructor.