christophsturm
01/31/2021, 6:22 PMclass Worker(val id: Int) {
fun work(): Worker {
Thread.sleep(1000)
return Worker(id)
}
}
println(measureTimeMillis { runBlocking {
listOf(Worker(1), Worker(2), Worker(3))
.map { worker ->
async(Dispatchers.Default) {
val worker2 = worker.work()
worker2.work()
}
}.awaitAll()
}})
with 2 maps instead?
listOf(Worker(1), Worker(2), Worker(3)).map {it.work()}.map{it.work()}
but still running the workers in multiple threads?
I know how i could do it with a channel, but is it also possible with a flow?Arslan Armanuly
02/01/2021, 5:24 AMsuspend fun main() {
listOf(Worker(1), Worker(2), Worker(3))
.mapAsync(GlobalScope) { it.work().also { println("${it.id} done") } }
.mapAsync(GlobalScope) { it.work().also { println("${it.id} done") } }
}
suspend inline fun <reified T, reified R> Iterable<T>.mapAsync(
scope: CoroutineScope,
crossinline block: (T) -> R
) = map { scope.async { block(it) } }.map { it.await() }
I tried to redo it with flows and this is the best thing I could come up with
suspend fun main() {
listOf(Worker(1), Worker(2), Worker(3))
.asFlow()
.asyncMap(GlobalScope) { it.work().also { println(it.id) } }
.asyncMap(GlobalScope) { it.work().also { println(it.id) } }
.collect()
}
inline fun <T, R> Flow<T>.asyncMap(
scope: CoroutineScope,
crossinline block: suspend (T) -> R
): Flow<R> = shareIn(GlobalScope, SharingStarted.WhileSubscribed())
.map{ scope.async { block(it) } }
.shareIn(GlobalScope, SharingStarted.WhileSubscribed())
.map { it.await() }
christophsturm
02/01/2021, 5:47 PM