vaskir
06/05/2020, 6:47 PMchannelFlow
does not cancel the child?
suspend fun run() = channelFlow {
suspendCancellableCoroutine<Unit> { cont ->
cont.invokeOnCancellation { println("Cancelled.") }
cont.resume(Unit) { }
}
for (x in 1..10) {
send(x)
}
}
suspend fun main(): Unit = coroutineScope {
withTimeout(1000) {
run().collect {
println(it)
delay(2000)
}
}
}
It produces:
1
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:158)
at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:128)
at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:497)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:68)
at java.base/java.lang.Thread.run(Thread.java:834)
Process finished with exit code 1
streetsofboston
06/05/2020, 6:52 PMoctylFractal
06/05/2020, 6:53 PMfor
loop, my bet is that the channelFlow immediately finishes its block (because the channel has a buffer size of 16 by default); so the channelFlow coroutine is complete and there's nothing to cancel
in addition,
suspendCancellableCoroutine<Unit> { cont ->
cont.invokeOnCancellation { println("Cancelled.") }
cont.resume(Unit) { }
}
is a little weird, normally if you wanted to add some listener for cancellation you'd just add it to the current context Job, not suspend for a continuation (not even sure if that works?)streetsofboston
06/05/2020, 6:55 PMchannelFlow {
try {
...
} catch (c: CancellationException) {
println("Cancelled")
throw c
}
}
Tash
06/05/2020, 6:58 PMbefore for
1
during for
during for
during for
during for
during for
during for
during for
during for
during for
during for
after for
Exception ...
vaskir
06/05/2020, 6:58 PMsuspendCancellableCoroutine
is here as a simplified version of a real thing, which does need it. I've just made up this minimal repro to demonstraite that it is not cancelled when the flow is.octylFractal
06/05/2020, 6:59 PMoctylFractal
06/05/2020, 6:59 PMchannelFlow
is buffered, if you were expecting non-buffered mechanics (which would behave how you expect) use flow
instead with emit
instead of send
vaskir
06/05/2020, 7:00 PMsuspendCancellableCoroutine
vaskir
06/05/2020, 7:01 PMemit
from different launch
vaskir
06/05/2020, 7:01 PMstreetsofboston
06/05/2020, 7:01 PMoctylFractal
06/05/2020, 7:05 PMoctylFractal
06/05/2020, 7:06 PMTash
06/05/2020, 7:08 PMfatih
06/05/2020, 7:10 PMfatih
06/05/2020, 7:11 PMvaskir
06/06/2020, 2:07 PMvaskir
06/06/2020, 2:08 PMvaskir
06/06/2020, 5:46 PMvaskir
06/06/2020, 5:47 PMvaskir
06/06/2020, 5:47 PMoctylFractal
06/06/2020, 6:54 PMoctylFractal
06/06/2020, 6:56 PMvaskir
06/06/2020, 7:02 PMvaskir
06/06/2020, 7:02 PMoctylFractal
06/06/2020, 7:43 PMfun startProcess(cmd: String, workingDir: File): Process {
return ProcessBuilder(*cmd.split("\\s".toRegex()).toTypedArray())
.directory(workingDir)
.start()
}
suspend fun Process.wait() {
try {
onExit().asDeferred().await()
val returnCode = exitValue()
if (returnCode != 0) {
throw IOException("Process $this exits with non zero code $returnCode.")
}
} finally {
destroy()
}
}
octylFractal
06/06/2020, 7:43 PMvaskir
06/07/2020, 7:45 AMasDeferred
is not resolved.vaskir
06/07/2020, 7:48 AMvaskir
06/07/2020, 7:49 AMasDeferred
, the compiler said onExit
does not exists. It's weird.vaskir
06/07/2020, 7:50 AMvaskir
06/07/2020, 7:50 AMvaskir
06/07/2020, 7:50 AMvaskir
06/07/2020, 7:51 AMvaskir
06/07/2020, 7:52 AMvaskir
06/07/2020, 7:53 AMvaskir
06/07/2020, 7:53 AMvaskir
06/07/2020, 7:55 AMoctylFractal
06/07/2020, 4:16 PMoctylFractal
06/07/2020, 4:17 PMvaskir
06/07/2020, 5:31 PMonExit
octylFractal
06/07/2020, 5:31 PMvaskir
06/07/2020, 5:32 PM