I'm trying this with something like ```inline fun...
# coroutines
s
I'm trying this with something like
Copy code
inline fun <T, R> Flow<T>.concurrentMap(
    concurrency: Int,
    @BuilderInference crossinline block: suspend (T) -> R
) = flow<R> {
    val scope = CoroutineScope(Executors.newFixedThreadPool(concurrency).asCoroutineDispatcher())
    val deferredResults = mutableListOf<Deferred<R>>()
    collect { value ->
        deferredResults.add(scope.async { block(value) })
    }
    deferredResults.awaitAll().forEach { newValue -> emit(newValue) }
    scope.cancel()
}
s
I think you could try
flatMapMerge
for this
There's a long thread on the GitHub issue: https://github.com/Kotlin/kotlinx.coroutines/issues/1147
s
Cool, let me check
flatMapMerge it is
Copy code
inline fun <T, R> Flow<T>.concurrentMap(
    concurrency: Int,
    @BuilderInference crossinline block: suspend (T) -> R
) = flatMapMerge(concurrency) { value -> flow { emit(block(value)) } }
s
👌🐕