https://kotlinlang.org logo
#flow
Title
# flow
e

expensivebelly

07/21/2021, 11:17 AM
Is there an equivalent of
Observable.concatArrayEager(...)
in Flow?
n

Nick Allen

08/02/2021, 11:00 PM
You can make any
Flow
"eager" with the
shareIn
operator. I think this would work:
Copy code
fun <T> concatEager(scope: CoroutineScope, vararg flows: Flow<T>): Flow<T> =
        flows.asFlow()
            .flatMapConcat { it.shareIn(scope, SharingStarted.Eagerly, Int.MAX_VALUE) }
e

expensivebelly

08/07/2021, 9:25 AM
How do I adjust the concurrency in there though like I do in RxJava operator?
n

Nick Allen

08/09/2021, 6:52 PM
Sorry, my earlier suggestion is completely broken anyways.
shareIn
returns a flow that never completes so later flows are never evaluated and flatMapConcat doesn't evaluate its lambda until it's done with earlier flows. If you want currency, then you'll need to build your own, I think. This example uses channels and a semaphore:
Copy code
fun <T> concatEager(vararg flows: Flow<T>, concurrency: Int = Int.MAX_VALUE): Flow<T> = channelFlow<T> {
    val semaphore = Semaphore(concurrency)
    val channels = flows.map { f ->
        Channel<T>(Channel.UNLIMITED).also { c ->
            //UNDISPATCHED ensures correct order of withPermit calls
            launch(start = CoroutineStart.UNDISPATCHED) {
                semaphore.withPermit {
                    f.collect { c.send(it) }
                    c.close()
                }
            }
        }
    }
    for(channel in channels) {
        for (item in channel) {
            send(item)
        }
    }
}
If you wanted to add a "combined" buffer size like the Rx operator, that'd have to be built in too with an atomic counter. If ok with a per-flow capacity, then that'd be easy to just pass into the Channel constructor.
👏 1
2 Views