Shreyas Patil
02/12/2022, 8:34 AMinline fun <T, R> Flow<T>.concurrentMap(
concurrency: Int,
@BuilderInference crossinline block: suspend (T) -> R
) = flow<R> {
val scope = CoroutineScope(Executors.newFixedThreadPool(concurrency).asCoroutineDispatcher())
val deferredResults = mutableListOf<Deferred<R>>()
collect { value ->
deferredResults.add(scope.async { block(value) })
}
deferredResults.awaitAll().forEach { newValue -> emit(newValue) }
scope.cancel()
}
Sam
02/12/2022, 8:38 AMflatMapMerge
for thisShreyas Patil
02/12/2022, 8:45 AMinline fun <T, R> Flow<T>.concurrentMap(
concurrency: Int,
@BuilderInference crossinline block: suspend (T) -> R
) = flatMapMerge(concurrency) { value -> flow { emit(block(value)) } }
Sam
02/12/2022, 9:02 AM