Nino
05/18/2022, 2:57 PMimport kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flowA : Flow<Int> = flowOf(1, 2, 3)
flowA.flatMapLatest { a: Int ->
doSomeMemoryIntensiveStuffForFlowB(a).map { b: String ->
doSomeCpuIntensiveStuff(b)
}
}.collect { c: Long ->
println(c)
}
flowA.flatMapLatest { a: Int ->
doSomeMemoryIntensiveStuffForFlowB(a)
}.map { b: String ->
doSomeCpuIntensiveStuff(b)
}.collect { c: Long ->
println(c)
}
}
fun doSomeMemoryIntensiveStuffForFlowB(a: Int): Flow<String> {
// Some Memory intensive stuff related to "a" (object creation, etc)
var memoryGreedyList = listOf<Int>()
repeat(10_000) { memoryGreedyList = memoryGreedyList + listOf(it) }
return flowOf(a.toString(), a.toString(), a.toString())
}
fun doSomeCpuIntensiveStuff(b: String): Long {
// Some CPU intensive stuff related to "b" (mapping, etc)
repeat(10_000) { b.toDouble() * it }
return b.toLong()
}
https://pl.kotl.in/sOl65cZgZNick Allen
05/18/2022, 5:58 PMflatMapLatest
will cancel collecting from the previously received Flow
when it gets a new one. That includes the doSomeCpuIntensiveStuff
call for the "first" where it's inside the flatMapLatest
lambda but not the "second". So only the "first" will try to cancel doSomeCpuIntensiveStuff
in response to a new item.
flatMapLatest
puts items into a Channel
. That means the downstream Flow
operators are running in a different coroutine than the upstream (including what's in it's lambda). So for the second, doSomeMemoryIntensiveStuffForFlowB
could be processing an item while doSomeCpuIntensiveStuff
is still working on some previous item.
FYI: None of your "intensive" sample code is actually cancellable: https://kotlinlang.org/docs/cancellation-and-timeouts.html. But maybe that's skipped here for the purposes of simplifying the sample code.