What's the `Flow` equivalent to Rx Merge ? <http:/...
# coroutines
d
What's the
Flow
equivalent to Rx Merge ? http://reactivex.io/documentation/operators/merge.html
d
Gah, I need it fast 😞
Better make one, shouldn't be too challenging...
(Famous last words)
Copy code
@FlowPreview
fun <T> Flow<T>.mergeWith(other: Flow<T>): Flow<T> = flow {
    coroutineScope {
        launch {
            other.collect {
                emit(it)
            }
        }

        launch {
            collect {
                emit(it)
            }
        }
    }
}
Just caught up on that thread @zak.taccardi; plugging the vararg size into concurrencyLimit feels like it's driving some dead-weight logic in this scenario
Hope they go for a separate
merge
that runs more like your implementation - more direct.
k
@darkmoon_uk
The "gotcha" with concurrency limit is a strong motivator for a separate merge operator.
Looks like that's the desired approach
b
IIRC You won't be able to emit from inside of those other jobs. you'll probably need to do a
channelFlow
Copy code
fun <T> Flow<T>.mergeWith(other: Flow<T>) = channelFlow {
    launch { collect { send(it) } }
    launch { other.collect { send(it) } }
}
LOL the
channelFlow
documentation provides a
merge
implementation as its example https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/channel-flow.html
🤣 1
😅 1