Id love to understand why this piece of code becom...
# coroutines
z
Id love to understand why this piece of code becomes unresponsive when a lot of work is already being done in the background. 🧵
Copy code
interface Assembler<in T, out R> {

    suspend fun assemble(
        input: T,
    ): R

    // Invoked on Dispatchers.Default
    suspend fun assemble(
        list: List<T>,
    ): List<R> {
        // Responsive (Y)
        if (true) return list.map { assemble(it) }

        // Unresponsive if a lot of other work is being done
        return coroutineScope {
            list
                .map { entry ->
                    async {
                        // Queries SQL in this particular case
                        // withContext(<http://Dispatchers.IO|Dispatchers.IO>) wraps the query
                        assemble(entry)
                    }
                }
                .awaitAll()
        }
    }
}
s
By ā€œunresponsiveā€ do you mean it takes longer than expected, or does it hang indefinitely?
I don’t see any obvious issue in the code you shared; it would be interesting to also see the implementation of
assemble(entry)
z
Hmm, Im having a hard time explaining the chain of events šŸ˜… Imagine two queries happening simultaneously, both use the 'assemble' function Ive linked above. One is quicker than the other, and if I just let both calls run their course - everything is fine. But, if I do something that results in one of the calls being invoked again, it will take way longer than it does otherwise. In practice this means that when the first piece of data is loaded and presented to me, if I edit something about it (which causes it to be reloaded in its new state); then it will take way longer. I havent been able to verify, but it feels like it waits until the other longer call is completed. The
assembler.assemble
is used in a
flow.mapLatest
block, so Id expect the previous coroutineScope block to be cancelled whenever theres a new emission (to reload the data). And
assemble(entry)
queries my SQL database for the first call, after that it fetches its data from my cache. Whats a bit suspicious to me is that one 'assemble(entry)' call can result in another 'assemble(entry)' call with another assembler, if that makes sense? So multiple coroutineScope blocks with this. Idk if thats a/the problem?
Actually, even if I remove potentially nested coroutineScope blocks - it doesnt make any difference
j
The caller of assemble must lanch multiple coroutines, you are launching coroutines inside map, but in caller you must do it to.
Copy code
suspend fun caller() {
  launch{ assemble() }
}
And another good practice, its to check if coroutine is alive on map, you need to do:
ensureActive()
inside map, to avoid memory leak
z
@Jhonatan Sabadi I dont think I should use launch here, the assemble is invoked in a flow.mapLatest block and I need the result of it! Im not seeing any difference with ensureActive, I suppose thats because Im using mapLatest and the scope is actually cancelled (where ensureActive would otherwise throw).
r
coroutineScope and supervisorScope will always wait for its child jobs to complete. Is this something that you expect?
z
@radityagumay I dont think I do, wouldnt the
coroutineScope
block above be cancelled together with its parent scope?
Copy code
suspend fun test(){
    flowOf(1,2,3).mapLatest { 
        coroutineScope { 
            delay(5000) //This block would be cancelled pretty much instantly due to the next number arriving?
        }
    }.collect()
}
r
block above be cancelled together with its parent scope?
correct, the code that you shared it's follow structured concurrency
correct the above code would cancelled immediately. see https://pl.kotl.in/-vaxKc7zVk
z
Exactly! šŸ‘šŸ½ That is what Id expect to happen, and Im fairly confident that is whats happening in my app as well. Its a bit harder to verify given the complexity of it, but I dont do anything weird, its just structured concurrency all throughout. The call-stack is practically:
MutableSharedFlow<Unit>.mapLatest{ suspendQueryCacheOrSql }.flowOn(Dispatchers.Default)
. At the time when these long query times are observed, every relevant piece of data is already in the cache. The assembler Ive mentioned above comes into the picture after "`suspendQueryCacheOrSql`"; for this scenario in particular it converts a list of 750 elements using the coroutineScope block; each element in that list results in additional cache queries, and additional assemble calls (Ive restricted the coroutineScope async to only happen for these 750 elements, the end result is still the same in terms of unresponsiveness).
Ive considered whether all this work was somehow causing issues for the involved dispatchers, but that doesnt seem to be the case either. Other "command" based (no query) actions happening throughout all of this are pretty much immediate as always.