https://kotlinlang.org logo
#coroutines
Title
# coroutines
v

vaskir

06/05/2020, 6:47 PM
Why cancelling
channelFlow
does not cancel the child?
Copy code
suspend fun run() = channelFlow {
    suspendCancellableCoroutine<Unit> { cont ->
        cont.invokeOnCancellation { println("Cancelled.") }
        cont.resume(Unit) { }
    }

    for (x in 1..10) {
        send(x)
    }
}

suspend fun main(): Unit = coroutineScope {
    withTimeout(1000) {
        run().collect {
            println(it)
            delay(2000)
        }
    }
}
It produces:
Copy code
1
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
	at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:158)
	at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:128)
	at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:497)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
	at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:68)
	at java.base/java.lang.Thread.run(Thread.java:834)

Process finished with exit code 1
s

streetsofboston

06/05/2020, 6:52 PM
I don’t see where you cancel the flow, except for the withTimeout in the main function.
o

octylFractal

06/05/2020, 6:53 PM
you should try printing out something after that
for
loop, my bet is that the channelFlow immediately finishes its block (because the channel has a buffer size of 16 by default); so the channelFlow coroutine is complete and there's nothing to cancel in addition,
Copy code
suspendCancellableCoroutine<Unit> { cont ->
        cont.invokeOnCancellation { println("Cancelled.") }
        cont.resume(Unit) { }
    }
is a little weird, normally if you wanted to add some listener for cancellation you'd just add it to the current context Job, not suspend for a continuation (not even sure if that works?)
s

streetsofboston

06/05/2020, 6:55 PM
And if you want to see if the child (i.e. the flow’s producer) gets cancelled, it can be done simpler without the suspendCancellableCoroutine:
Copy code
channelFlow {
    try {
        ...
    } catch (c: CancellationException) {
        println("Cancelled")
        throw c
    }
}
t

Tash

06/05/2020, 6:58 PM
interesting, after putting some logs in your sample, i see this output:
Copy code
before for
1
during for
during for
during for
during for
during for
during for
during for
during for
during for
during for
after for

Exception ...
v

vaskir

06/05/2020, 6:58 PM
The use of
suspendCancellableCoroutine
is here as a simplified version of a real thing, which does need it. I've just made up this minimal repro to demonstraite that it is not cancelled when the flow is.
o

octylFractal

06/05/2020, 6:59 PM
yep, that's what I expected
👍🏼 1
a
channelFlow
is buffered, if you were expecting non-buffered mechanics (which would behave how you expect) use
flow
instead with
emit
instead of
send
v

vaskir

06/05/2020, 7:00 PM
What I'm trying to implement: a function, which runs a Process and returns Flow. It should be safe, i.e. it the flow cancelled, the process should be killed, which is implemented via
suspendCancellableCoroutine
@octylFractal yes, I tried flow {} first, but it does not allow
emit
from different
launch
which I need
s

streetsofboston

06/05/2020, 7:01 PM
I don’t think you see “Cancelled” printed out because it immediately finishes.
o

octylFractal

06/05/2020, 7:05 PM
I'm curious, what JDK version are you on?
I'm trying to design a "good" solution that works how you expect, but there are some Process API differences in 9+ that make it easier
t

Tash

06/05/2020, 7:08 PM
@vaskir do you mean that you want the Process to start running, and then continue to run in the background as the Flow/Channel emits until cancelled?
f

fatih

06/05/2020, 7:10 PM
Is the process returning something which is emitted to flow? @vaskir
Or just starting and a separate job?
v

vaskir

06/06/2020, 2:07 PM
@octylFractal JDK 11
@Tashyes. I want it to return a Flow, and it (and only it) should be the only way to cancel (destroy) the process.
Could anybody explain what's happening?
Why the cancellation is lost?
o

octylFractal

06/06/2020, 6:54 PM
It looks pretty simple to me -- the process finished, it's done executing before one second is up, so there's nothing to cancel
when the timeout is short enough, then the cancel propagates before the process finishes, so you see the cancellation
v

vaskir

06/06/2020, 7:02 PM
ah, it's just waited for process to exit and `resume`s. Thanks!
I'm stupid 🙂
o

octylFractal

06/06/2020, 7:43 PM
btw, for simplicity, I suggest changing your process functions to the following:
Copy code
fun startProcess(cmd: String, workingDir: File): Process {
    return ProcessBuilder(*cmd.split("\\s".toRegex()).toTypedArray())
        .directory(workingDir)
        .start()
}

suspend fun Process.wait() {
    try {
        onExit().asDeferred().await()
        val returnCode = exitValue()
        if (returnCode != 0) {
            throw IOException("Process $this exits with non zero code $returnCode.")
        }
    } finally {
        destroy()
    }
}
they're much simpler while accomplishing the same goal
v

vaskir

06/07/2020, 7:45 AM
@octylFractal thanks! But
asDeferred
is not resolved.
wait. it's not asDeferred, it's onExit that's not found: Unresolved reference: onExit
the editor highlights
asDeferred
, the compiler said
onExit
does not exists. It's weird.
message has been deleted
message has been deleted
hmmm. I think it's on Mac only.
open jdk 14 on mac, open jdk 11 on windows. Why the difference?
on windows:
wow it's really strange.
what am I missing?
build.gradle is the same.
o

octylFractal

06/07/2020, 4:16 PM
onExit() is on the Process class in the jdk since 9, and asDeferred is part of the jdk8 coroutine extensions, so you'll need to add the appropriate library if you don't have it
I suspect on Mac three it's not using 14 for real, otherwise I don't know why that would be missing
v

vaskir

06/07/2020, 5:31 PM
@octylFractal yeah, adding jdk8 package solved the problem on Windows, but on Mac it still cannot resolve
onExit
o

octylFractal

06/07/2020, 5:31 PM
yea, idk, like I said it's an API since jdk 9 so I don't know why it would be missing
v

vaskir

06/07/2020, 5:32 PM
Anyway, big thanks for the simplification, I learn a lot from it.
👍 1
19 Views