frogger
02/21/2020, 2:27 PMapi.fetchEntry(entry)
there is also a variant that returns a Future<String>
for(indexPage in api.fetchIndexIterable()) {
for(entry in indexPage) {
val document = api.fetchEntry(entry)
val changed = computeSomeChanges(document)
writeInOrder(changed)
}
}
ribesg
02/21/2020, 2:30 PMindexPage.asSequence().map { api.fetchFutureEntry(it) }.map { computeSomeChanges(it.get()) }.forEach { writeInOrder(changed) }
but I would rather use async coroutinesViniciustelesbez
02/21/2020, 2:30 PMmarstran
02/21/2020, 2:40 PMFlow
. I think you can use the buffer
function to fetch some amount of entries concurrently. Something like this:
api.fetchIndexIterable().flatten().asFlow()
.map { api.fetchEntry(it) }
.buffer(100)
.collect { document ->
val changed = computeSomeChanges(document)
writeInOrder(changed)
}
Leon K
02/21/2020, 2:43 PMbuffer
only run the collection and production concurrently, but not actually produce multiple values concurrently?frogger
02/21/2020, 2:53 PMribesg
02/21/2020, 2:55 PMfetchEntry
that returns a Future
right? Then use thatLeon K
02/21/2020, 2:55 PMparallelMap
function that just executes a map
concurrently)ribesg
02/21/2020, 2:56 PMindexPage.map { api.fetchEntryFuture(it) }
should start all queries at once but still return without waiting anything, your HTTP client should limit the amount of concurrent requests, but then you just have a bunch of Future you can iterate on and only block on a request if it’s not done yet with get
Leon K
02/21/2020, 2:58 PMsuspend fun <A, B> Iterable<A>.parallelMap(f: suspend (A) -> B): List<B> = coroutineScope {
map { async { f(it) } }.awaitAll()
}
this only relies on coroutine-stuff, and does effectively the exact same thing as the Future
solutionLeon K
02/21/2020, 2:59 PMindexPage.parallelMap { api.fetchEntry(it) }
executes all fetchEntry functions in parallel, so you'll need to make sure to chunk your list beforehandribesg
02/21/2020, 2:59 PMfrogger
02/21/2020, 3:03 PMLeon K
02/21/2020, 3:04 PMNikola Milovic
02/21/2020, 3:19 PMval list = _drillsTypeList.value
viewModelScope.launch {
list?.map {
async {
val newDrillImageUrl = repository.getImageUrl(it.drillType_imageUrl)
it.drillType_imageUrl = newDrillImageUrl
}
}?.awaitAll()
}.invokeOnCompletion {
_drillsTypeList.postValue(list)
}
// the operation that has to finish first
suspend fun getImageUrl(storageLocation : String) : String = suspendCoroutine {cont ->
firebaseStorage.getReferenceFromUrl(storageLocation).downloadUrl.addOnSuccessListener {
cont.resume(it.toString())
}.addOnFailureListener {
cont.resumeWithException(it)
}
}
}
marstran
02/21/2020, 8:31 PMcoroutineScope {
api.fetchIndexIterable()
.flatten()
.asFlow()
.map { async { api.fetchEntry(it) } }
.buffer(100)
.collect { document ->
val changed = computeSomeChanges(document.await())
writeInOrder(changed)
}
}
marstran
02/21/2020, 8:53 PMchannelFlow
, which will handle the coroutinescope for you:
channelFlow<Deferred<Document>> {
api.fetchIndexIterable()
.flatten()
.forEach {
send(async { api.fetchEntry(it) })
}
}
.buffer(100)
.collect { document ->
val changed = computeSomeChanges(document.await())
writeInOrder(changed)
}
Leon K
02/22/2020, 10:38 AMparallelMap
? I don't see a reason why you'd actually need Flow here. Flow
is a pretty complicated API, that I'd only use if i actually need itmarstran
02/22/2020, 10:58 AMFlow
is lazy while Iterable
is eager. The buffer
method on Flow
will also always keep 100 elements buffered. It will start fetching the 101th element as soon as you finish processing the 1st element. If you use chunking and parallelMap
, then you will fetch 100 elements, then process them all, then fetch the next 100 elements. You cannot start processing the first element before the whole chunk is complete.marstran
02/22/2020, 10:59 AMchunked
to limit the number of concurrently running api-calls. Could you give an example?marstran
02/22/2020, 11:00 AMLeon K
02/22/2020, 11:01 AMasync
part though. You'd then be creating 100 Deferred
which would include the data processing.
I'm not sure about how buffered works, but as i understand the Documentation, does not execute multiple "generators" in parallel, it just allows for consuming element n
to happen in parallel to producing element n+1
.marstran
02/22/2020, 11:01 AMDeferred
objects, it will work as expected.Leon K
02/22/2020, 11:15 AMimport kotlinx.coroutines.*
import kotlin.random.Random
suspend fun <A, B> Iterable<A>.parallelMap(f: suspend (A) -> B): List<B> = coroutineScope {
map { async { f(it) } }.awaitAll()
}
suspend fun loadData(endpoint: Int): String {
delay(Random.nextLong(100, 200))
println("loading data from $endpoint")
return "hi"
}
fun processData(data: String): String = data
suspend fun loadAndProcess(endpoints: List<Int>) = endpoints
.chunked(10)
.mapIndexed { idx, endpoints ->
println("processing chunk #$idx")
endpoints.parallelMap { endpoint -> processData(loadData(endpoint)) }
}
runBlocking {
println(loadAndProcess(List(100) { it }))
}
run this, you'll see what I mean.
It will first chunk the list of endpoints into lists of 10 (it would be 100, but 10 is easier to read ;D ).
then, in sequence, for each of that chunk, will start a parallelMap
that loads and processes the data in parallel (this means that up to the minimum of chunkSize or the maximum requests / processes that can be executed in parallel will be executed in parallel. Each piece of data will be processed as soon as it arrives, and all of that in parallel. I've randomized the delay in loadData
to easily show that the processes are in fact all running in parallelLeon K
02/22/2020, 11:17 AMFlow
and the corresponding API
- it reads exactly like synchronous code, except that one map
call has been exchanged for parallelMap
- It's shorter
- idk about this one, but it might even be less resource intensive, as no Flow
-required types need to be allocated, but I'm not sure if this is true, of even if it is, is relevantLeon K
02/22/2020, 11:18 AMflatten
at the end of the loadAndProcess
call chain, but that's besides the point)marstran
02/22/2020, 11:42 AMsuspend fun loadAndProcess(endpoints: List<Int>) = channelFlow {
endpoints.forEach { send(async { processData(loadData(endpoint)) }) }
}.buffer(10)
.map { it.await() }
.toList()
marstran
02/22/2020, 11:45 AMFlow
solution is also probably less resource intensive, because you create no intermediate chunked-lists.Leon K
02/22/2020, 1:21 PM