How could I roughly optimize this code to run (at ...
# announcements
f
How could I roughly optimize this code to run (at least) the slow part “api.fetchEntry(entry)” concurrently? (There is a lot of IO wait here, but parallel fetching works well). The writes must stay in order and there is a max number of document that would fit into memory at the same time (let’s say 100). Just need to hint in the right direction, no running code. For
api.fetchEntry(entry)
there is also a variant that returns a
Future<String>
Copy code
for(indexPage in api.fetchIndexIterable()) {
	for(entry in indexPage) {
		val document = api.fetchEntry(entry)
		val changed = computeSomeChanges(document)
		writeInOrder(changed)
	}		
}
r
You would rewrite the same thing in a more functional way using Sequence, something like
indexPage.asSequence().map { api.fetchFutureEntry(it) }.map { computeSomeChanges(it.get()) }.forEach { writeInOrder(changed) }
but I would rather use async coroutines
m
I would use a
Flow
. I think you can use the
buffer
function to fetch some amount of entries concurrently. Something like this:
Copy code
api.fetchIndexIterable().flatten().asFlow()
  .map { api.fetchEntry(it) }
  .buffer(100)
  .collect { document ->
    val changed = computeSomeChanges(document)
    writeInOrder(changed)
  }
l
doesn't
buffer
only run the collection and production concurrently, but not actually produce multiple values concurrently?
f
It seems that is the really hard question here. How to efficiently load and buffer up to 100 entries. How would it work for your approach @ribesg?
r
You have a version of
fetchEntry
that returns a
Future
right? Then use that
l
as I said, you could try to chunk your list into pieces, then parallelMap over them (i pretty much always have a
parallelMap
function that just executes a
map
concurrently)
r
indexPage.map { api.fetchEntryFuture(it) }
should start all queries at once but still return without waiting anything, your HTTP client should limit the amount of concurrent requests, but then you just have a bunch of Future you can iterate on and only block on a request if it’s not done yet with
get
l
Copy code
suspend fun <A, B> Iterable<A>.parallelMap(f: suspend (A) -> B): List<B> = coroutineScope {
    map { async { f(it) } }.awaitAll()
}
this only relies on coroutine-stuff, and does effectively the exact same thing as the
Future
solution
(so,
indexPage.parallelMap { api.fetchEntry(it) }
executes all fetchEntry functions in parallel, so you'll need to make sure to chunk your list beforehand
r
@frogger do you already use coroutines, Flows, etc? If you do, use that. If you don’t, don’t use that. You don’t need it for what you want to do here. It’s still a good idea to learn coroutines but it’s not a little task
f
The point was that there is not enough memory to run all at once. @Leon K suggested to chunk it which would work. (Seems there is nothing better so far). In classic multitasking I would have looked for a consumer/producer setup with a Bounded buffer, I guess. But the functional approaches here look much nicer. (And have easier error handling)
l
yea, coroutines and Functional Programming (read: immutable datastructures) make a lot of classic multi-threading ceremonies obsolete (for many cases)
n
Can I chip in with something similar? Thanks to other responses I got the general idea on how to approach this. Trying to loop through a list and update the elements. And after everything has been done to update the live data. Currently this isn't doing that. The list contains 3 elements and only 2 are processed also. On completion isn't what I am looking for it seems, because it is called before all of the asyncs are finished
Copy code
val list = _drillsTypeList.value
        viewModelScope.launch {
            list?.map {
                async {
                    val newDrillImageUrl = repository.getImageUrl(it.drillType_imageUrl)
                    it.drillType_imageUrl = newDrillImageUrl
                }
            }?.awaitAll()

        }.invokeOnCompletion {
            _drillsTypeList.postValue(list)
        }
// the operation that has to finish first 
suspend fun getImageUrl(storageLocation : String) : String = suspendCoroutine {cont ->
            firebaseStorage.getReferenceFromUrl(storageLocation).downloadUrl.addOnSuccessListener {
                cont.resume(it.toString())
            }.addOnFailureListener {
                cont.resumeWithException(it)
            }
        }
}
m
I see now that my buffer example was slightly wrong. As @Leon K said, it will not actually cause the fetching to run concurrently. To fix this problem, we could instead make a flow of async-computations. I have a feeling that there is a more clean version using only flows, but I can’t think of it right now 🙂 Something like this:
Copy code
coroutineScope {
  api.fetchIndexIterable()
    .flatten()
    .asFlow()
    .map { async { api.fetchEntry(it) } }
    .buffer(100)
    .collect { document ->
      val changed = computeSomeChanges(document.await())
      writeInOrder(changed)
    }
}
Actually, you could use
channelFlow
, which will handle the coroutinescope for you:
Copy code
channelFlow<Deferred<Document>> {
  api.fetchIndexIterable()
    .flatten()
    .forEach { 
      send(async { api.fetchEntry(it) }) 
    }
}
.buffer(100)
.collect { document ->
  val changed = computeSomeChanges(document.await())
  writeInOrder(changed)
}
l
@marstran but why not just use my proposed
parallelMap
? I don't see a reason why you'd actually need Flow here.
Flow
is a pretty complicated API, that I'd only use if i actually need it
m
Mostly because
Flow
is lazy while
Iterable
is eager. The
buffer
method on
Flow
will also always keep 100 elements buffered. It will start fetching the 101th element as soon as you finish processing the 1st element. If you use chunking and
parallelMap
, then you will fetch 100 elements, then process them all, then fetch the next 100 elements. You cannot start processing the first element before the whole chunk is complete.
Also, I don’t really see how you would use
chunked
to limit the number of concurrently running api-calls. Could you give an example?
I kind of have the opposite experience with flows. It has made my code simpler, not more complex.
l
you can absolutely do that by including the processing in the
async
part though. You'd then be creating 100
Deferred
which would include the data processing. I'm not sure about how buffered works, but as i understand the Documentation, does not execute multiple "generators" in parallel, it just allows for consuming element
n
to happen in parallel to producing element
n+1
.
m
Yes, but since I’m buffering
Deferred
objects, it will work as expected.
l
Copy code
import kotlinx.coroutines.*
import kotlin.random.Random


suspend fun <A, B> Iterable<A>.parallelMap(f: suspend (A) -> B): List<B> = coroutineScope {
    map { async { f(it) } }.awaitAll()
}

suspend fun loadData(endpoint: Int): String {
    delay(Random.nextLong(100, 200))
    println("loading data from $endpoint")
    return "hi"
}

fun processData(data: String): String = data

suspend fun loadAndProcess(endpoints: List<Int>) = endpoints
    .chunked(10)
    .mapIndexed { idx, endpoints ->
        println("processing chunk #$idx")
        endpoints.parallelMap { endpoint -> processData(loadData(endpoint)) }
    }


runBlocking {
    println(loadAndProcess(List(100) { it }))
}
run this, you'll see what I mean. It will first chunk the list of endpoints into lists of 10 (it would be 100, but 10 is easier to read ;D ). then, in sequence, for each of that chunk, will start a
parallelMap
that loads and processes the data in parallel (this means that up to the minimum of chunkSize or the maximum requests / processes that can be executed in parallel will be executed in parallel. Each piece of data will be processed as soon as it arrives, and all of that in parallel. I've randomized the delay in
loadData
to easily show that the processes are in fact all running in parallel
I find this a lot more readable because - it requires no knowledge of
Flow
and the corresponding API - it reads exactly like synchronous code, except that one
map
call has been exchanged for
parallelMap
- It's shorter - idk about this one, but it might even be less resource intensive, as no
Flow
-required types need to be allocated, but I'm not sure if this is true, of even if it is, is relevant
(you might want to
flatten
at the end of the
loadAndProcess
call chain, but that's besides the point)
m
Hmm, but this will complete one chunk completely, before you start doing the next one. The flow solution is more effective, because it “slides” through the list of api-calls. It’s also not shorter. Here’s how the flow code would look, using your example:
Copy code
suspend fun loadAndProcess(endpoints: List<Int>) = channelFlow {
  endpoints.forEach { send(async { processData(loadData(endpoint)) }) }
}.buffer(10)
 .map { it.await() }
 .toList()
The
Flow
solution is also probably less resource intensive, because you create no intermediate chunked-lists.
l
the only thing making the non-flow based solution slower is the short time that will be not used to fetch new data at the point where all data has been fetched from chunk 1, but the last piece of data has not yet finished processing. Altough i guess there is nothing wrong with the flow based solution either, if the buffering actually works that way ;D