frogger
02/21/2020, 2:27 PMapi.fetchEntry(entry) there is also a variant that returns a Future<String>
for(indexPage in api.fetchIndexIterable()) {
for(entry in indexPage) {
val document = api.fetchEntry(entry)
val changed = computeSomeChanges(document)
writeInOrder(changed)
}
}ribesg
02/21/2020, 2:30 PMindexPage.asSequence().map { api.fetchFutureEntry(it) }.map { computeSomeChanges(it.get()) }.forEach { writeInOrder(changed) } but I would rather use async coroutinesViniciustelesbez
02/21/2020, 2:30 PMmarstran
02/21/2020, 2:40 PMFlow . I think you can use the buffer function to fetch some amount of entries concurrently. Something like this:
api.fetchIndexIterable().flatten().asFlow()
.map { api.fetchEntry(it) }
.buffer(100)
.collect { document ->
val changed = computeSomeChanges(document)
writeInOrder(changed)
}Leon K
02/21/2020, 2:43 PMbuffer only run the collection and production concurrently, but not actually produce multiple values concurrently?frogger
02/21/2020, 2:53 PMribesg
02/21/2020, 2:55 PMfetchEntry that returns a Future right? Then use thatLeon K
02/21/2020, 2:55 PMparallelMap function that just executes a map concurrently)ribesg
02/21/2020, 2:56 PMindexPage.map { api.fetchEntryFuture(it) } should start all queries at once but still return without waiting anything, your HTTP client should limit the amount of concurrent requests, but then you just have a bunch of Future you can iterate on and only block on a request if it’s not done yet with getLeon K
02/21/2020, 2:58 PMsuspend fun <A, B> Iterable<A>.parallelMap(f: suspend (A) -> B): List<B> = coroutineScope {
map { async { f(it) } }.awaitAll()
}
this only relies on coroutine-stuff, and does effectively the exact same thing as the Future solutionLeon K
02/21/2020, 2:59 PMindexPage.parallelMap { api.fetchEntry(it) } executes all fetchEntry functions in parallel, so you'll need to make sure to chunk your list beforehandribesg
02/21/2020, 2:59 PMfrogger
02/21/2020, 3:03 PMLeon K
02/21/2020, 3:04 PMNikola Milovic
02/21/2020, 3:19 PMval list = _drillsTypeList.value
viewModelScope.launch {
list?.map {
async {
val newDrillImageUrl = repository.getImageUrl(it.drillType_imageUrl)
it.drillType_imageUrl = newDrillImageUrl
}
}?.awaitAll()
}.invokeOnCompletion {
_drillsTypeList.postValue(list)
}
// the operation that has to finish first
suspend fun getImageUrl(storageLocation : String) : String = suspendCoroutine {cont ->
firebaseStorage.getReferenceFromUrl(storageLocation).downloadUrl.addOnSuccessListener {
cont.resume(it.toString())
}.addOnFailureListener {
cont.resumeWithException(it)
}
}
}marstran
02/21/2020, 8:31 PMcoroutineScope {
api.fetchIndexIterable()
.flatten()
.asFlow()
.map { async { api.fetchEntry(it) } }
.buffer(100)
.collect { document ->
val changed = computeSomeChanges(document.await())
writeInOrder(changed)
}
}marstran
02/21/2020, 8:53 PMchannelFlow , which will handle the coroutinescope for you:
channelFlow<Deferred<Document>> {
api.fetchIndexIterable()
.flatten()
.forEach {
send(async { api.fetchEntry(it) })
}
}
.buffer(100)
.collect { document ->
val changed = computeSomeChanges(document.await())
writeInOrder(changed)
}Leon K
02/22/2020, 10:38 AMparallelMap? I don't see a reason why you'd actually need Flow here. Flow is a pretty complicated API, that I'd only use if i actually need itmarstran
02/22/2020, 10:58 AMFlow is lazy while Iterable is eager. The buffer method on Flow will also always keep 100 elements buffered. It will start fetching the 101th element as soon as you finish processing the 1st element. If you use chunking and parallelMap, then you will fetch 100 elements, then process them all, then fetch the next 100 elements. You cannot start processing the first element before the whole chunk is complete.marstran
02/22/2020, 10:59 AMchunked to limit the number of concurrently running api-calls. Could you give an example?marstran
02/22/2020, 11:00 AMLeon K
02/22/2020, 11:01 AMasync part though. You'd then be creating 100 Deferred which would include the data processing.
I'm not sure about how buffered works, but as i understand the Documentation, does not execute multiple "generators" in parallel, it just allows for consuming element n to happen in parallel to producing element n+1.marstran
02/22/2020, 11:01 AMDeferred objects, it will work as expected.Leon K
02/22/2020, 11:15 AMimport kotlinx.coroutines.*
import kotlin.random.Random
suspend fun <A, B> Iterable<A>.parallelMap(f: suspend (A) -> B): List<B> = coroutineScope {
map { async { f(it) } }.awaitAll()
}
suspend fun loadData(endpoint: Int): String {
delay(Random.nextLong(100, 200))
println("loading data from $endpoint")
return "hi"
}
fun processData(data: String): String = data
suspend fun loadAndProcess(endpoints: List<Int>) = endpoints
.chunked(10)
.mapIndexed { idx, endpoints ->
println("processing chunk #$idx")
endpoints.parallelMap { endpoint -> processData(loadData(endpoint)) }
}
runBlocking {
println(loadAndProcess(List(100) { it }))
}
run this, you'll see what I mean.
It will first chunk the list of endpoints into lists of 10 (it would be 100, but 10 is easier to read ;D ).
then, in sequence, for each of that chunk, will start a parallelMap that loads and processes the data in parallel (this means that up to the minimum of chunkSize or the maximum requests / processes that can be executed in parallel will be executed in parallel. Each piece of data will be processed as soon as it arrives, and all of that in parallel. I've randomized the delay in loadData to easily show that the processes are in fact all running in parallelLeon K
02/22/2020, 11:17 AMFlow and the corresponding API
- it reads exactly like synchronous code, except that one map call has been exchanged for parallelMap
- It's shorter
- idk about this one, but it might even be less resource intensive, as no Flow-required types need to be allocated, but I'm not sure if this is true, of even if it is, is relevantLeon K
02/22/2020, 11:18 AMflatten at the end of the loadAndProcess call chain, but that's besides the point)marstran
02/22/2020, 11:42 AMsuspend fun loadAndProcess(endpoints: List<Int>) = channelFlow {
endpoints.forEach { send(async { processData(loadData(endpoint)) }) }
}.buffer(10)
.map { it.await() }
.toList()marstran
02/22/2020, 11:45 AMFlow solution is also probably less resource intensive, because you create no intermediate chunked-lists.Leon K
02/22/2020, 1:21 PM