Btw, `.map { it.await() }` seems to be a common pa...
# coroutines
e
Btw,
.map { it.await() }
seems to be a common pattern. We might want to introduce extension like
.awaitAll()
for it.
👍 12
g
.joinAll()
would be helpful as well
e
(It is in my internal todo list, but moving it to a public spot seems like a good idea)
b
is there an equivalent that returns them in order of completion?
g
What is your case for results in order of completion?
b
I was thinking if you don't care about order but wish to process results as quickly as possible. In a simple
.map {}
, if the first job in the list takes the longest you have to wait for it to finish before doing anything with any results
i remember c# also had something equivalent but i can't remember what it was called
g
If you need some additional processing of result and you don’t want to wait for other results, just do that inside async function it very easy with coroutines and more natural
Copy code
(0..100).map { async { 
   val taskResult = mySuspendingTask() 
   processResult(taskResult)
} }.map { it.await() }
And as Roman said before if you don’t want to collect all the results, just run launch for each taks and omit
.await()/.join()
b
Yea there's always ways, like
.map { it.await() }
instead of
.awaitAll()
. Sometimes it's nice to have a more elegant method like the
.as_completed
function.
g
you can use Channels this way, Channels api supports selectors and consumeEach
I’ve ported one of examples from concurrent.futures to coroutines directly (so with locking network api, so it’s not correct way to do that, but still working, I’ve just wrap it to coroutine with dispatcher, to prevent coroutine locking) And don’t understand why do I need
as_completed
in this case:
Copy code
import kotlinx.coroutines.experimental.*
import java.net.URL

val urls = listOf(
        "<http://www.foxnews.com/>",
        "<http://www.cnn.com/>",
        "<http://europe.wsj.com/>",
        "<http://www.bbc.co.uk/>",
        "<http://some-made-up-domain.com>"
)

val dispatcher = newFixedThreadPoolContext(5, "NetworkDispatcher")

suspend fun loadUrl(url: String, timeout: Int): ByteArray {
    return run(dispatcher) {
        URL(url).openConnection().apply {
            connectTimeout = timeout * 1000
        }.getInputStream().buffered().use {
            it.readBytes()
        }
    }
}

fun main(args: Array<String>) = runBlocking {
    urls.forEach { url ->
        launch(coroutineContext) {
            try {
                val data = loadUrl(url, 60)
                println("$url page is ${data.size} bytes")
            } catch (e: Exception) {
                println("$url generated an exception: $e")
            }
        }
    }
}
In the real world you would just use async api for network request and move dispatcher to launch to limit parallel requests
If you show me some example how you use it and how it can make code more elegant, I probably can show example how you can do that with coroutines
b
i didn't have any use cases in mind, I was just thinking in general. It would make something like this nice:
Copy code
val results = urls.map { async { fetch(it) } }

    for( result in results.asCompleted()) {
        // display results as they come in, in a threadsafe manner
    }
g
Copy code
urls.map { launch { 
   fetch(it) 
   // display results as they come in, in a threadsafe manner
} }
does exactly the same
b
that's not threadsafe if they are on a threadpool
g
It’s can be threadsafe
if you want to process it in single thread just use Channel
b
yeah, channel would be easiest way to replicate that, but the channel is just implementation noise, it's more elegant / easier to reason without it
g
Copy code
val channel = Channel<ByteArray>()
    urls.map { url -> launch { channel.send(loadUrl(url, 60)) }
    launch {
        channel.consumeEach {
            // display results as they come in, in a threadsafe manner
        }
    }
b
see that's less clear if you're just reading through the code
here's a quick example I threw together:
Copy code
@Test
    fun test() = runBlocking {
        val results = arrayOf(3, 5, 2, 1, 4).map {
            async {
                delay(it.toLong(), TimeUnit.SECONDS)
                it
            }
        }

        for (result in results.asCompleted())
            println(result)
    }

    suspend fun <T> List<Deferred<T>>.asCompleted() = produce {
        map { deferred ->
            launch {
                send(deferred.await())
            }
        }.map { it.join() }
    }
g
Yes, definitely, you can write such abstraction over Channel. But I would like to see a real use case, maybe it can be done in a different way
j
not to derail, but can Channel add some optional knobs and dials to become essentially reactive buffering ?
e
There are buffered implementations for channels. The buffer size if you dial.