Why cancelling `channelFlow` does not cancel the c...
# coroutines
v
Why cancelling
channelFlow
does not cancel the child?
Copy code
suspend fun run() = channelFlow {
    suspendCancellableCoroutine<Unit> { cont ->
        cont.invokeOnCancellation { println("Cancelled.") }
        cont.resume(Unit) { }
    }

    for (x in 1..10) {
        send(x)
    }
}

suspend fun main(): Unit = coroutineScope {
    withTimeout(1000) {
        run().collect {
            println(it)
            delay(2000)
        }
    }
}
It produces:
Copy code
1
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
	at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:158)
	at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:128)
	at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:497)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
	at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:68)
	at java.base/java.lang.Thread.run(Thread.java:834)

Process finished with exit code 1
s
I don’t see where you cancel the flow, except for the withTimeout in the main function.
o
you should try printing out something after that
for
loop, my bet is that the channelFlow immediately finishes its block (because the channel has a buffer size of 16 by default); so the channelFlow coroutine is complete and there's nothing to cancel in addition,
Copy code
suspendCancellableCoroutine<Unit> { cont ->
        cont.invokeOnCancellation { println("Cancelled.") }
        cont.resume(Unit) { }
    }
is a little weird, normally if you wanted to add some listener for cancellation you'd just add it to the current context Job, not suspend for a continuation (not even sure if that works?)
s
And if you want to see if the child (i.e. the flow’s producer) gets cancelled, it can be done simpler without the suspendCancellableCoroutine:
Copy code
channelFlow {
    try {
        ...
    } catch (c: CancellationException) {
        println("Cancelled")
        throw c
    }
}
t
interesting, after putting some logs in your sample, i see this output:
Copy code
before for
1
during for
during for
during for
during for
during for
during for
during for
during for
during for
during for
after for

Exception ...
v
The use of
suspendCancellableCoroutine
is here as a simplified version of a real thing, which does need it. I've just made up this minimal repro to demonstraite that it is not cancelled when the flow is.
o
yep, that's what I expected
👍🏼 1
a
channelFlow
is buffered, if you were expecting non-buffered mechanics (which would behave how you expect) use
flow
instead with
emit
instead of
send
v
What I'm trying to implement: a function, which runs a Process and returns Flow. It should be safe, i.e. it the flow cancelled, the process should be killed, which is implemented via
suspendCancellableCoroutine
@octylFractal yes, I tried flow {} first, but it does not allow
emit
from different
launch
which I need
s
I don’t think you see “Cancelled” printed out because it immediately finishes.
o
I'm curious, what JDK version are you on?
I'm trying to design a "good" solution that works how you expect, but there are some Process API differences in 9+ that make it easier
t
@vaskir do you mean that you want the Process to start running, and then continue to run in the background as the Flow/Channel emits until cancelled?
f
Is the process returning something which is emitted to flow? @vaskir
Or just starting and a separate job?
v
@octylFractal JDK 11
@Tashyes. I want it to return a Flow, and it (and only it) should be the only way to cancel (destroy) the process.
Could anybody explain what's happening?
Why the cancellation is lost?
o
It looks pretty simple to me -- the process finished, it's done executing before one second is up, so there's nothing to cancel
when the timeout is short enough, then the cancel propagates before the process finishes, so you see the cancellation
v
ah, it's just waited for process to exit and `resume`s. Thanks!
I'm stupid 🙂
o
btw, for simplicity, I suggest changing your process functions to the following:
Copy code
fun startProcess(cmd: String, workingDir: File): Process {
    return ProcessBuilder(*cmd.split("\\s".toRegex()).toTypedArray())
        .directory(workingDir)
        .start()
}

suspend fun Process.wait() {
    try {
        onExit().asDeferred().await()
        val returnCode = exitValue()
        if (returnCode != 0) {
            throw IOException("Process $this exits with non zero code $returnCode.")
        }
    } finally {
        destroy()
    }
}
they're much simpler while accomplishing the same goal
v
@octylFractal thanks! But
asDeferred
is not resolved.
wait. it's not asDeferred, it's onExit that's not found: Unresolved reference: onExit
the editor highlights
asDeferred
, the compiler said
onExit
does not exists. It's weird.
message has been deleted
message has been deleted
hmmm. I think it's on Mac only.
open jdk 14 on mac, open jdk 11 on windows. Why the difference?
on windows:
wow it's really strange.
what am I missing?
build.gradle is the same.
o
onExit() is on the Process class in the jdk since 9, and asDeferred is part of the jdk8 coroutine extensions, so you'll need to add the appropriate library if you don't have it
I suspect on Mac three it's not using 14 for real, otherwise I don't know why that would be missing
v
@octylFractal yeah, adding jdk8 package solved the problem on Windows, but on Mac it still cannot resolve
onExit
o
yea, idk, like I said it's an API since jdk 9 so I don't know why it would be missing
v
Anyway, big thanks for the simplification, I learn a lot from it.
👍 1