Austin
05/15/2023, 4:08 PMsuspend
functions so I wanted to check the isActive
flag, but that is always true
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
suspend fun main() {
val inputFlow = MutableStateFlow(false)
coroutineScope {
launch {
inputFlow
.collectLatest { input ->
coroutineScope {
println("processing - input: $input")
while (isActive) {
println("is active")
Thread.sleep(1_000) // simulate CPU tasks
}
println("processed - input: $input")
}
}
}
launch {
repeat(10) {
println("Publishing")
inputFlow.value = !inputFlow.value
delay(1_000)
}
}
}
}
Prints:
Publishing
processing - input: true
is active
Publishing
is active
Publishing
is active
Publishing
is active
<repeat forever>
Joffrey
05/15/2023, 4:12 PMisActive
to return false here? Nothing cancels any scope in this snippetAustin
05/15/2023, 4:13 PMJoffrey
05/15/2023, 4:14 PMcollect
, not collectLatest
Austin
05/15/2023, 4:17 PMPublishing
processing - input: true
Publishing
processing - input: false
Publishing
processed - input: false
processing - input: true
Publishing
processed - input: true
processing - input: false
Publishing
but I am unable to convert the algorithm to use suspend methods (especially since there is no suspending code in it (it has no I/O, just a lot of number crunching in very large data structures)Chantry Cargill
05/15/2023, 4:24 PMwithContext(Dispatchers.Default) {
// do your business
}
Joffrey
05/15/2023, 4:27 PMyield()
within a while(true)
rather than checking isActive
. This allows the coroutine to free the thread so other coroutines can do stuff. Maybe in your case the collectLatest
internals don't get an opportunity to cancel your loop if you don't suspend anywhere. When I add yield()
, it behaves as intended.Austin
05/15/2023, 4:30 PMsuspend
since its highly recursive algorithm and this said it makes the performance plummet (https://stackoverflow.com/questions/48129878/kotlin-suspend-function-recursive-call)Joffrey
05/15/2023, 4:31 PMwhile
loop. I thought your recursive algorithm was represented with the sleep, not the loop, and you wanted to cancel between the callsJoffrey
05/15/2023, 4:32 PMJoffrey
05/15/2023, 4:32 PMisActive
worked as you expected, how would you have expected to get the value of isActive
from a non-suspend function? Did you intend to pass the job down in the recursive calls?Austin
05/15/2023, 4:33 PMisActive
route I was trying to go do would have let me pass the corotuineContext
around and not pay the cost of the suspending machinery each function callJoffrey
05/15/2023, 4:34 PMJoffrey
05/15/2023, 4:35 PMrunBlocking
)Austin
05/15/2023, 4:42 PMvar latestJob: Job? = null
inputFlow
.collectLatest { input ->
println("processing - input: $input")
latestJob?.cancel()
latestJob = async {
repeat(4) {
ensureActive()
Thread.sleep(1_000)
}
println("processed - input: $input")
}
}
}
Joffrey
05/15/2023, 5:24 PMcoroutineScope { launch { ... }}
(it seems stupid but it solves the issue the same way)Austin
05/15/2023, 5:30 PMsuspend fun main() {
val inputFlow = MutableStateFlow(0)
coroutineScope {
launch {
inputFlow
.collectLatest { input ->
println("processing - input: $input")
coroutineScope {
launch {
repeat(4) {
println("I am running still for $input")
Thread.sleep(1_000)
}
println("processed - input: $input")
}
}
}
}
launch {
repeat(10) {
val newValue = inputFlow.value + 1
println("Publishing $newValue")
inputFlow.value = newValue
delay(1_000)
}
}
}
}
prints
Publishing 1
processing - input: 1
I am running still for 1
Publishing 2
I am running still for 1
Publishing 3
I am running still for 1
Publishing 4
I am running still for 1
Publishing 5
processed - input: 1
processing - input: 2
processing - input: 5
I am running still for 5
I am running still for 5
Publishing 6
I am running still for 5
Publishing 7
Publishing 8
I am running still for 5
processed - input: 5
Publishing 9
processing - input: 6
processing - input: 9
I am running still for 9
I am running still for 9
Publishing 10
I am running still for 9
I am running still for 9
processed - input: 9
processing - input: 10
I am running still for 10
I am running still for 10
I am running still for 10
I am running still for 10
processed - input: 10
Joffrey
05/15/2023, 5:31 PMisActive
check (or interruption mechanism)Austin
05/15/2023, 5:32 PMAustin
05/15/2023, 5:32 PMPublishing 1
processing - input: 1
I am running still for 1
Publishing 2
processing - input: 2
I am running still for 2
Publishing 3
processing - input: 3
I am running still for 3
that seems to be correct now, thx!Joffrey
05/15/2023, 5:33 PMisActive
, here is the interruption-based version:
suspend fun main() {
val inputFlow = MutableStateFlow(-1)
coroutineScope {
launch {
inputFlow
.collectLatest { input ->
println("Receiving value=$input from thread ${Thread.currentThread().name}")
coroutineScope {
launch {
runInterruptible {
println("Starting recursion on thread ${Thread.currentThread().name}...")
cpuIntensiveRecursion(input)
}
}
}
}
}
launch {
repeat(10) {
println("Publishing value=$it from thread ${Thread.currentThread().name}")
inputFlow.value = it
delay(1_000)
}
}
}
}
private fun cpuIntensiveRecursion(input: Int) {
// the loop simulates the point where the recursive code would check for interruption
while (!Thread.currentThread().isInterrupted) {
cpuIntensiveCall()
println("Still running on input $input... (thread : ${Thread.currentThread().name})")
}
println("Stopped input $input processing due to interruption")
}
private fun cpuIntensiveCall() {
repeat(50_000) {
val byteArray = ByteArray(2048)
SecureRandom.getInstanceStrong().nextBytes(byteArray)
if (byteArray.hashCode() < 1000) {
println("LUCKY STRIKE!")
}
}
}
Austin
05/15/2023, 5:35 PMTrevor Stone
05/15/2023, 5:47 PMyield()
and seeing what the performance impact actually was for your use-case?Austin
05/15/2023, 5:57 PMuli
05/17/2023, 7:02 AMwithContext
as @Chantry Cargill mentioned.
You did not mention which dispatcher this all runs on. If your dispatcher is single threaded, or all your threads are in use, your tight loop might starve collectLatest
from canceling your collector.Joffrey
05/17/2023, 7:25 AMsuspend fun main
(which doesn't have a dispatcher) and uses launch
. AFAICT it uses the same coroutine to run the collectLatest
lambda and to manage the cancellation, which makes it synchronously alternate between the collect body and the collectLatest
internals on suspension points, but when there is none, it's kinda stuck. I think that's why the nested coroutineScope
+ launch
seems to work, I believe it creates a new coroutine that can work in parallel of the collectLatest internals, thus allowing the cancellation, even though the collect lambda still has to wait for that coroutine.uli
05/17/2023, 7:28 AMAustin
05/17/2023, 7:13 PMJoffrey
05/17/2023, 9:13 PM