elizarov
11/29/2017, 10:22 AM.map { it.await() }
seems to be a common pattern. We might want to introduce extension like .awaitAll()
for it.gildor
11/29/2017, 10:23 AM.joinAll()
would be helpful as wellelizarov
11/29/2017, 10:26 AMbj0
11/29/2017, 4:48 PMgildor
11/29/2017, 11:40 PMbj0
11/29/2017, 11:51 PM.map {}
, if the first job in the list takes the longest you have to wait for it to finish before doing anything with any resultsgildor
11/30/2017, 12:48 AM(0..100).map { async {
val taskResult = mySuspendingTask()
processResult(taskResult)
} }.map { it.await() }
.await()/.join()
bj0
11/30/2017, 2:57 AM.map { it.await() }
instead of .awaitAll()
. Sometimes it's nice to have a more elegant method like the .as_completed
function.gildor
11/30/2017, 3:12 AMas_completed
in this case:
import kotlinx.coroutines.experimental.*
import java.net.URL
val urls = listOf(
"<http://www.foxnews.com/>",
"<http://www.cnn.com/>",
"<http://europe.wsj.com/>",
"<http://www.bbc.co.uk/>",
"<http://some-made-up-domain.com>"
)
val dispatcher = newFixedThreadPoolContext(5, "NetworkDispatcher")
suspend fun loadUrl(url: String, timeout: Int): ByteArray {
return run(dispatcher) {
URL(url).openConnection().apply {
connectTimeout = timeout * 1000
}.getInputStream().buffered().use {
it.readBytes()
}
}
}
fun main(args: Array<String>) = runBlocking {
urls.forEach { url ->
launch(coroutineContext) {
try {
val data = loadUrl(url, 60)
println("$url page is ${data.size} bytes")
} catch (e: Exception) {
println("$url generated an exception: $e")
}
}
}
}
bj0
11/30/2017, 4:06 AMval results = urls.map { async { fetch(it) } }
for( result in results.asCompleted()) {
// display results as they come in, in a threadsafe manner
}
gildor
11/30/2017, 4:07 AMurls.map { launch {
fetch(it)
// display results as they come in, in a threadsafe manner
} }
bj0
11/30/2017, 4:07 AMgildor
11/30/2017, 4:08 AMbj0
11/30/2017, 4:10 AMgildor
11/30/2017, 4:10 AMval channel = Channel<ByteArray>()
urls.map { url -> launch { channel.send(loadUrl(url, 60)) }
launch {
channel.consumeEach {
// display results as they come in, in a threadsafe manner
}
}
bj0
11/30/2017, 4:14 AM@Test
fun test() = runBlocking {
val results = arrayOf(3, 5, 2, 1, 4).map {
async {
delay(it.toLong(), TimeUnit.SECONDS)
it
}
}
for (result in results.asCompleted())
println(result)
}
suspend fun <T> List<Deferred<T>>.asCompleted() = produce {
map { deferred ->
launch {
send(deferred.await())
}
}.map { it.join() }
}
gildor
11/30/2017, 5:46 AMjimn
12/03/2017, 7:26 AMelizarov
12/03/2017, 7:29 AM