What is the correct way to abandon out of a collectLatest that has a tight CPU loop? I am doing some...
a
What is the correct way to abandon out of a collectLatest that has a tight CPU loop? I am doing some heavy tree structure stuff so want to avoid the recursion costs of recursive
suspend
functions so I wanted to check the
isActive
flag, but that is always true
Copy code
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch

suspend fun main() {
    val inputFlow = MutableStateFlow(false)

    coroutineScope {
        launch {
            inputFlow
                .collectLatest { input ->
                    coroutineScope {
                        println("processing - input: $input")

                        while (isActive) {
                            println("is active")
                            Thread.sleep(1_000) // simulate CPU tasks
                        }

                        println("processed - input: $input")
                    }
                }
        }

        launch {
            repeat(10) {
                println("Publishing")
                inputFlow.value = !inputFlow.value

                delay(1_000)
            }
        }
    }
}
Prints:
Copy code
Publishing
processing - input: true
is active
Publishing
is active
Publishing
is active
Publishing
is active
<repeat forever>
j
What do you mean by "recursive suspend functions"? There doesn't seem to be any recursion involved here. Also, at what point would you expect
isActive
to return false here? Nothing cancels any scope in this snippet
a
The repeat keeps pushing new values into the state flow, so collect latest will cancel out. I left out the CPU recursive code, theres no way to extract it out into a small runnable snippet
j
Ah my bad, for some reason I read
collect
, not
collectLatest
a
a suspending call to simulate it (delay vs thead.sleep) will behave correctly:
Copy code
Publishing
processing - input: true
Publishing
processing - input: false
Publishing
processed - input: false
processing - input: true
Publishing
processed - input: true
processing - input: false
Publishing
but I am unable to convert the algorithm to use suspend methods (especially since there is no suspending code in it (it has no I/O, just a lot of number crunching in very large data structures)
c
I believe you'd want to do the CPU intensive task in another coroutine. Ie
Copy code
withContext(Dispatchers.Default) { 
  // do your business
}
j
I usually use
yield()
within a
while(true)
rather than checking
isActive
. This allows the coroutine to free the thread so other coroutines can do stuff. Maybe in your case the
collectLatest
internals don't get an opportunity to cancel your loop if you don't suspend anywhere. When I add
yield()
, it behaves as intended.
a
I was trying to avoid the
suspend
since its highly recursive algorithm and this said it makes the performance plummet (https://stackoverflow.com/questions/48129878/kotlin-suspend-function-recursive-call)
j
@Austin but you modeled it with a
while
loop. I thought your recursive algorithm was represented with the sleep, not the loop, and you wanted to cancel between the calls
I get your issue now, thanks
Now even if
isActive
worked as you expected, how would you have expected to get the value of
isActive
from a non-suspend function? Did you intend to pass the job down in the recursive calls?
a
Sorry, the
isActive
route I was trying to go do would have let me pass the
corotuineContext
around and not pay the cost of the suspending machinery each function call
j
One thing you can do is using thread interruption instead
Make your recursive algorithm interruption aware, and then wrap the call to your algo in runInterruptible (the opposite of
runBlocking
)
a
Hmm, I can't seem to think of how to tie the collectLatest to interrupt the thread though since it fully blocks I did get this to work..but it seems silly to have to store the state outside....
Copy code
var latestJob: Job? = null
            inputFlow
                .collectLatest { input ->
                    println("processing - input: $input")
                    latestJob?.cancel()

                    latestJob = async {
                        repeat(4) {
                            ensureActive()
                            Thread.sleep(1_000)
                        }

                        println("processed - input: $input")
                    }
                }
        }
j
Mmh yeah actually I experimented with the interruption option, and it has the same problem. That said, you don't have to go all the way to keeping the reference to the job manually. You can achieve the same result by spawning a different coroutine and still wait for it by wrapping the processing code into
coroutineScope { launch { ... }}
(it seems stupid but it solves the issue the same way)
a
I tried that but no luck unless I am misunderstanding:
Copy code
suspend fun main() {
    val inputFlow = MutableStateFlow(0)
    coroutineScope {
        launch {
            inputFlow
                .collectLatest { input ->
                    println("processing - input: $input")
                    coroutineScope {
                        launch {
                            repeat(4) {
                                println("I am running still for $input")
                                Thread.sleep(1_000)
                            }
                            
                            println("processed - input: $input")
                        }
                    }
                }
        }

        launch {
            repeat(10) {
                val newValue = inputFlow.value + 1
                println("Publishing $newValue")
                inputFlow.value = newValue

                delay(1_000)
            }
        }
    }
}
prints
Copy code
Publishing 1
processing - input: 1
I am running still for 1
Publishing 2
I am running still for 1
Publishing 3
I am running still for 1
Publishing 4
I am running still for 1
Publishing 5
processed - input: 1
processing - input: 2
processing - input: 5
I am running still for 5
I am running still for 5
Publishing 6
I am running still for 5
Publishing 7
Publishing 8
I am running still for 5
processed - input: 5
Publishing 9
processing - input: 6
processing - input: 9
I am running still for 9
I am running still for 9
Publishing 10
I am running still for 9
I am running still for 9
processed - input: 9
processing - input: 10
I am running still for 10
I am running still for 10
I am running still for 10
I am running still for 10
processed - input: 10
j
But here you also removed the
isActive
check (or interruption mechanism)
a
d'oh
Copy code
Publishing 1
processing - input: 1
I am running still for 1
Publishing 2
processing - input: 2
I am running still for 2
Publishing 3
processing - input: 3
I am running still for 3
that seems to be correct now, thx!
j
If you don't want to pass the context around and use
isActive
, here is the interruption-based version:
Copy code
suspend fun main() {
    val inputFlow = MutableStateFlow(-1)

    coroutineScope {
        launch {
            inputFlow
                .collectLatest { input ->
                    println("Receiving value=$input from thread ${Thread.currentThread().name}")

                    coroutineScope {
                        launch {
                            runInterruptible {
                                println("Starting recursion on thread ${Thread.currentThread().name}...")
                                cpuIntensiveRecursion(input)
                            }
                        }
                    }
                }
        }

        launch {
            repeat(10) {
                println("Publishing value=$it from thread ${Thread.currentThread().name}")
                inputFlow.value = it

                delay(1_000)
            }
        }
    }
}

private fun cpuIntensiveRecursion(input: Int) {
    // the loop simulates the point where the recursive code would check for interruption
    while (!Thread.currentThread().isInterrupted) {
        cpuIntensiveCall()
        println("Still running on input $input... (thread : ${Thread.currentThread().name})")
    }
    println("Stopped input $input processing due to interruption")
}

private fun cpuIntensiveCall() {
    repeat(50_000) {
        val byteArray = ByteArray(2048)
        SecureRandom.getInstanceStrong().nextBytes(byteArray)
        if (byteArray.hashCode() < 1000) {
            println("LUCKY STRIKE!")
        }
    }
}
a
oh, nice. Thx a lot
t
did you actually try using
yield()
and seeing what the performance impact actually was for your use-case?
a
I'll double check that. I was also tweaking other stuff so suspending overhead may have been a red herring
u
Did you try scheduling the tight loop to the default dispatcher using
withContext
as @Chantry Cargill mentioned. You did not mention which dispatcher this all runs on. If your dispatcher is single threaded, or all your threads are in use, your tight loop might starve
collectLatest
from canceling your collector.
j
Well, at least in this example we know it already uses the default dispatcher, since it's in
suspend fun main
(which doesn't have a dispatcher) and uses
launch
. AFAICT it uses the same coroutine to run the
collectLatest
lambda and to manage the cancellation, which makes it synchronously alternate between the collect body and the
collectLatest
internals on suspension points, but when there is none, it's kinda stuck. I think that's why the nested
coroutineScope
+
launch
seems to work, I believe it creates a new coroutine that can work in parallel of the collectLatest internals, thus allowing the cancellation, even though the collect lambda still has to wait for that coroutine.
u
If it runs on the same coroutine, I do not understand, how parts of the same coroutine could be interleaved. I guess it takes some reading of code 🙂
a
Actual code runs on the default dispatcher as well
j
@uli right. I might have brain farted on this one. But then I'm still confused as to why it would not dispatch on other threads of the default dispatcher and stay stuck on the blocking call like this
119 Views