Austin
05/15/2023, 4:08 PMsuspend functions so I wanted to check the isActive flag, but that is always true
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
suspend fun main() {
val inputFlow = MutableStateFlow(false)
coroutineScope {
launch {
inputFlow
.collectLatest { input ->
coroutineScope {
println("processing - input: $input")
while (isActive) {
println("is active")
Thread.sleep(1_000) // simulate CPU tasks
}
println("processed - input: $input")
}
}
}
launch {
repeat(10) {
println("Publishing")
inputFlow.value = !inputFlow.value
delay(1_000)
}
}
}
}
Prints:
Publishing
processing - input: true
is active
Publishing
is active
Publishing
is active
Publishing
is active
<repeat forever>Joffrey
05/15/2023, 4:12 PMisActive to return false here? Nothing cancels any scope in this snippetAustin
05/15/2023, 4:13 PMJoffrey
05/15/2023, 4:14 PMcollect, not collectLatestAustin
05/15/2023, 4:17 PMPublishing
processing - input: true
Publishing
processing - input: false
Publishing
processed - input: false
processing - input: true
Publishing
processed - input: true
processing - input: false
Publishing
but I am unable to convert the algorithm to use suspend methods (especially since there is no suspending code in it (it has no I/O, just a lot of number crunching in very large data structures)Chantry Cargill
05/15/2023, 4:24 PMwithContext(Dispatchers.Default) {
// do your business
}Joffrey
05/15/2023, 4:27 PMyield() within a while(true) rather than checking isActive. This allows the coroutine to free the thread so other coroutines can do stuff. Maybe in your case the collectLatest internals don't get an opportunity to cancel your loop if you don't suspend anywhere. When I add yield(), it behaves as intended.Austin
05/15/2023, 4:30 PMsuspend since its highly recursive algorithm and this said it makes the performance plummet (https://stackoverflow.com/questions/48129878/kotlin-suspend-function-recursive-call)Joffrey
05/15/2023, 4:31 PMwhile loop. I thought your recursive algorithm was represented with the sleep, not the loop, and you wanted to cancel between the callsJoffrey
05/15/2023, 4:32 PMJoffrey
05/15/2023, 4:32 PMisActive worked as you expected, how would you have expected to get the value of isActive from a non-suspend function? Did you intend to pass the job down in the recursive calls?Austin
05/15/2023, 4:33 PMisActive route I was trying to go do would have let me pass the corotuineContext around and not pay the cost of the suspending machinery each function callJoffrey
05/15/2023, 4:34 PMJoffrey
05/15/2023, 4:35 PMrunBlocking)Austin
05/15/2023, 4:42 PMvar latestJob: Job? = null
inputFlow
.collectLatest { input ->
println("processing - input: $input")
latestJob?.cancel()
latestJob = async {
repeat(4) {
ensureActive()
Thread.sleep(1_000)
}
println("processed - input: $input")
}
}
}Joffrey
05/15/2023, 5:24 PMcoroutineScope { launch { ... }} (it seems stupid but it solves the issue the same way)Austin
05/15/2023, 5:30 PMsuspend fun main() {
val inputFlow = MutableStateFlow(0)
coroutineScope {
launch {
inputFlow
.collectLatest { input ->
println("processing - input: $input")
coroutineScope {
launch {
repeat(4) {
println("I am running still for $input")
Thread.sleep(1_000)
}
println("processed - input: $input")
}
}
}
}
launch {
repeat(10) {
val newValue = inputFlow.value + 1
println("Publishing $newValue")
inputFlow.value = newValue
delay(1_000)
}
}
}
}
prints
Publishing 1
processing - input: 1
I am running still for 1
Publishing 2
I am running still for 1
Publishing 3
I am running still for 1
Publishing 4
I am running still for 1
Publishing 5
processed - input: 1
processing - input: 2
processing - input: 5
I am running still for 5
I am running still for 5
Publishing 6
I am running still for 5
Publishing 7
Publishing 8
I am running still for 5
processed - input: 5
Publishing 9
processing - input: 6
processing - input: 9
I am running still for 9
I am running still for 9
Publishing 10
I am running still for 9
I am running still for 9
processed - input: 9
processing - input: 10
I am running still for 10
I am running still for 10
I am running still for 10
I am running still for 10
processed - input: 10Joffrey
05/15/2023, 5:31 PMisActive check (or interruption mechanism)Austin
05/15/2023, 5:32 PMAustin
05/15/2023, 5:32 PMPublishing 1
processing - input: 1
I am running still for 1
Publishing 2
processing - input: 2
I am running still for 2
Publishing 3
processing - input: 3
I am running still for 3
that seems to be correct now, thx!Joffrey
05/15/2023, 5:33 PMisActive, here is the interruption-based version:
suspend fun main() {
val inputFlow = MutableStateFlow(-1)
coroutineScope {
launch {
inputFlow
.collectLatest { input ->
println("Receiving value=$input from thread ${Thread.currentThread().name}")
coroutineScope {
launch {
runInterruptible {
println("Starting recursion on thread ${Thread.currentThread().name}...")
cpuIntensiveRecursion(input)
}
}
}
}
}
launch {
repeat(10) {
println("Publishing value=$it from thread ${Thread.currentThread().name}")
inputFlow.value = it
delay(1_000)
}
}
}
}
private fun cpuIntensiveRecursion(input: Int) {
// the loop simulates the point where the recursive code would check for interruption
while (!Thread.currentThread().isInterrupted) {
cpuIntensiveCall()
println("Still running on input $input... (thread : ${Thread.currentThread().name})")
}
println("Stopped input $input processing due to interruption")
}
private fun cpuIntensiveCall() {
repeat(50_000) {
val byteArray = ByteArray(2048)
SecureRandom.getInstanceStrong().nextBytes(byteArray)
if (byteArray.hashCode() < 1000) {
println("LUCKY STRIKE!")
}
}
}Austin
05/15/2023, 5:35 PMTrevor Stone
05/15/2023, 5:47 PMyield() and seeing what the performance impact actually was for your use-case?Austin
05/15/2023, 5:57 PMuli
05/17/2023, 7:02 AMwithContext as @Chantry Cargill mentioned.
You did not mention which dispatcher this all runs on. If your dispatcher is single threaded, or all your threads are in use, your tight loop might starve collectLatest from canceling your collector.Joffrey
05/17/2023, 7:25 AMsuspend fun main (which doesn't have a dispatcher) and uses launch. AFAICT it uses the same coroutine to run the collectLatest lambda and to manage the cancellation, which makes it synchronously alternate between the collect body and the collectLatest internals on suspension points, but when there is none, it's kinda stuck. I think that's why the nested coroutineScope + launch seems to work, I believe it creates a new coroutine that can work in parallel of the collectLatest internals, thus allowing the cancellation, even though the collect lambda still has to wait for that coroutine.uli
05/17/2023, 7:28 AMAustin
05/17/2023, 7:13 PMJoffrey
05/17/2023, 9:13 PM