https://kotlinlang.org logo
Title
c

christophsturm

01/31/2021, 6:22 PM
is there a way to write this code:
class Worker(val id: Int) {
    fun work(): Worker {
        Thread.sleep(1000)
        return Worker(id)
    }
}


println(measureTimeMillis { runBlocking {
    listOf(Worker(1), Worker(2), Worker(3))
        .map { worker ->
            async(Dispatchers.Default) {
                val worker2 = worker.work()
                worker2.work()
            }
        }.awaitAll()
}})
with 2 maps instead?
listOf(Worker(1), Worker(2), Worker(3)).map {it.work()}.map{it.work()}
but still running the workers in multiple threads? I know how i could do it with a channel, but is it also possible with a flow?
a

Arslan Armanuly

02/01/2021, 5:24 AM
The example with lists will work perfectly
suspend fun main()  {
    listOf(Worker(1), Worker(2), Worker(3))
        .mapAsync(GlobalScope) { it.work().also { println("${it.id} done") } }
        .mapAsync(GlobalScope) { it.work().also { println("${it.id} done") } }

}

suspend inline fun <reified T, reified R> Iterable<T>.mapAsync(
    scope: CoroutineScope,
    crossinline block: (T) -> R
) = map { scope.async { block(it) } }.map { it.await() }
I tried to redo it with flows and this is the best thing I could come up with
suspend fun main() {
    listOf(Worker(1), Worker(2), Worker(3))
        .asFlow()
        .asyncMap(GlobalScope) { it.work().also { println(it.id) } }
        .asyncMap(GlobalScope) { it.work().also { println(it.id) } }
        .collect()
}

inline fun <T, R> Flow<T>.asyncMap(
    scope: CoroutineScope,
    crossinline block: suspend (T) -> R
): Flow<R> = shareIn(GlobalScope, SharingStarted.WhileSubscribed())
    .map{ scope.async { block(it) } }
    .shareIn(GlobalScope, SharingStarted.WhileSubscribed())
    .map { it.await() }
c

christophsturm

02/01/2021, 5:47 PM
thanks a lot for that! my use case, and the reason why i want to use flows is this: i have this loop that does multiple cpu intensive steps, by default they should stay on the same thread because thats probably the fastest way to do it. but it would be great for some use casesif i could have the results after the first round of work. The non coroutine way would proably to send the results to a queue (or channel), or call a callback that records the intermediate results. and somehow i got the feeling that flows are meant to replace exactly that.
right now i use coroutines mostly as abstraction for threading and queue, to have less jvm specific code, and maybe my usecase is not fitting for coroutines so well. But maybe i can learn something