Zoltan Demant
09/29/2022, 9:37 AMZoltan Demant
09/29/2022, 9:38 AMinterface Assembler<in T, out R> {
suspend fun assemble(
input: T,
): R
// Invoked on Dispatchers.Default
suspend fun assemble(
list: List<T>,
): List<R> {
// Responsive (Y)
if (true) return list.map { assemble(it) }
// Unresponsive if a lot of other work is being done
return coroutineScope {
list
.map { entry ->
async {
// Queries SQL in this particular case
// withContext(<http://Dispatchers.IO|Dispatchers.IO>) wraps the query
assemble(entry)
}
}
.awaitAll()
}
}
}
Sam
09/29/2022, 9:54 AMSam
09/29/2022, 9:57 AMassemble(entry)
Zoltan Demant
09/29/2022, 10:16 AMassembler.assemble
is used in a flow.mapLatest
block, so Id expect the previous coroutineScope block to be cancelled whenever theres a new emission (to reload the data).
And assemble(entry)
queries my SQL database for the first call, after that it fetches its data from my cache. Whats a bit suspicious to me is that one 'assemble(entry)' call can result in another 'assemble(entry)' call with another assembler, if that makes sense? So multiple coroutineScope blocks with this. Idk if thats a/the problem?Zoltan Demant
09/29/2022, 10:21 AMJhonatan Sabadi
09/29/2022, 12:03 PMsuspend fun caller() {
launch{ assemble() }
}
Jhonatan Sabadi
09/29/2022, 12:04 PMensureActive()
inside map, to avoid memory leakZoltan Demant
09/29/2022, 1:40 PMradityagumay
10/01/2022, 12:00 PMZoltan Demant
10/02/2022, 1:38 PMcoroutineScope
block above be cancelled together with its parent scope?
suspend fun test(){
flowOf(1,2,3).mapLatest {
coroutineScope {
delay(5000) //This block would be cancelled pretty much instantly due to the next number arriving?
}
}.collect()
}
radityagumay
10/03/2022, 1:00 AMblock above be cancelled together with its parent scope?correct, the code that you shared it's follow structured concurrency
radityagumay
10/03/2022, 1:08 AMZoltan Demant
10/03/2022, 4:13 AMMutableSharedFlow<Unit>.mapLatest{ suspendQueryCacheOrSql }.flowOn(Dispatchers.Default)
. At the time when these long query times are observed, every relevant piece of data is already in the cache. The assembler Ive mentioned above comes into the picture after "`suspendQueryCacheOrSql`"; for this scenario in particular it converts a list of 750 elements using the coroutineScope block; each element in that list results in additional cache queries, and additional assemble calls (Ive restricted the coroutineScope async to only happen for these 750 elements, the end result is still the same in terms of unresponsiveness).Zoltan Demant
10/03/2022, 4:17 AM