Nino
05/18/2022, 2:57 PMimport kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flowA : Flow<Int> = flowOf(1, 2, 3)
flowA.flatMapLatest { a: Int ->
doSomeMemoryIntensiveStuffForFlowB(a).map { b: String ->
doSomeCpuIntensiveStuff(b)
}
}.collect { c: Long ->
println(c)
}
flowA.flatMapLatest { a: Int ->
doSomeMemoryIntensiveStuffForFlowB(a)
}.map { b: String ->
doSomeCpuIntensiveStuff(b)
}.collect { c: Long ->
println(c)
}
}
fun doSomeMemoryIntensiveStuffForFlowB(a: Int): Flow<String> {
// Some Memory intensive stuff related to "a" (object creation, etc)
var memoryGreedyList = listOf<Int>()
repeat(10_000) { memoryGreedyList = memoryGreedyList + listOf(it) }
return flowOf(a.toString(), a.toString(), a.toString())
}
fun doSomeCpuIntensiveStuff(b: String): Long {
// Some CPU intensive stuff related to "b" (mapping, etc)
repeat(10_000) { b.toDouble() * it }
return b.toLong()
}
https://pl.kotl.in/sOl65cZgZNick Allen
05/18/2022, 5:58 PMflatMapLatest will cancel collecting from the previously received Flow when it gets a new one. That includes the doSomeCpuIntensiveStuff call for the "first" where it's inside the flatMapLatest lambda but not the "second". So only the "first" will try to cancel doSomeCpuIntensiveStuff in response to a new item.
flatMapLatest puts items into a Channel. That means the downstream Flow operators are running in a different coroutine than the upstream (including what's in it's lambda). So for the second, doSomeMemoryIntensiveStuffForFlowB could be processing an item while doSomeCpuIntensiveStuff is still working on some previous item.
FYI: None of your "intensive" sample code is actually cancellable: https://kotlinlang.org/docs/cancellation-and-timeouts.html. But maybe that's skipped here for the purposes of simplifying the sample code.