https://kotlinlang.org logo
Title
s

Shreyas Patil

02/12/2022, 8:34 AM
I'm trying this with something like
inline fun <T, R> Flow<T>.concurrentMap(
    concurrency: Int,
    @BuilderInference crossinline block: suspend (T) -> R
) = flow<R> {
    val scope = CoroutineScope(Executors.newFixedThreadPool(concurrency).asCoroutineDispatcher())
    val deferredResults = mutableListOf<Deferred<R>>()
    collect { value ->
        deferredResults.add(scope.async { block(value) })
    }
    deferredResults.awaitAll().forEach { newValue -> emit(newValue) }
    scope.cancel()
}
s

Sam

02/12/2022, 8:38 AM
I think you could try
flatMapMerge
for this
There's a long thread on the GitHub issue: https://github.com/Kotlin/kotlinx.coroutines/issues/1147
s

Shreyas Patil

02/12/2022, 8:45 AM
Cool, let me check
flatMapMerge it is
inline fun <T, R> Flow<T>.concurrentMap(
    concurrency: Int,
    @BuilderInference crossinline block: suspend (T) -> R
) = flatMapMerge(concurrency) { value -> flow { emit(block(value)) } }
s

Sam

02/12/2022, 9:02 AM
👌:cool-doge: