paulblessing
01/22/2020, 9:12 PMsuspendCancellableCoroutine
to cause .join()
after cancellation to suspend until the Continuation is resumed / the background work is finished, similar to how things work when using a withContext
to switch threads?
I'm curious if I can get these to behave the same:
private suspend fun runViaThread(): String {
return suspendCancellableCoroutine { continuation ->
thread {
println("Starting background work")
Thread.sleep(2000)
println("Finished background work")
continuation.resume("done")
}
}
}
private suspend fun runViaWithContext(): String {
return withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
println("Starting background work")
Thread.sleep(2000)
println("Finished background work")
"done"
}
}
@Test fun testViaThread() {
/*
Prints:
Starting background work
Before join "coroutine#2":StandaloneCoroutine{Cancelling}@5c7fa833
After join "coroutine#2":StandaloneCoroutine{Cancelled}@5c7fa833
Finished background work
*/
runBlocking {
val job = launch(Dispatchers.Default) { runViaThread() }
delay(1000)
job.cancel()
println("Before join $job")
job.join()
println("After join $job")
delay(3000)
}
}
@Test fun testViaWithContext() {
/*
Prints:
Starting background work
Before join "coroutine#4":StandaloneCoroutine{Cancelling}@2b98378d
Finished background work
After join "coroutine#4":StandaloneCoroutine{Cancelled}@2b98378d
*/
runBlocking {
val job = launch(Dispatchers.Default) { runViaWithContext() }
delay(1000)
job.cancel()
println("Before join $job")
job.join()
println("After join $job")
delay(3000)
}
}
bezrukov
01/22/2020, 10:23 PMsuspendCoroutine
instead of suspendCancellableCoroutine
both methods will behave the samebezrukov
01/22/2020, 10:28 PMdelay
instead of Thread.sleep
. When you need non cancellable piece of code, you can wrap it to withContext(NonCancellable)
explicitly and it will work regardless of cancellation support inside the block.paulblessing
01/22/2020, 10:45 PMdelay
. I'm actually trying to do something more along the lines of this - was just trying to simplify as much as possible for asking the question.
suspend fun <T> ExecutorService.submitAndAwait(task: () -> T): T {
return suspendCancellableCoroutine { continuation ->
val future = submit<Unit> {
try {
val result = task()
continuation.resume(result)
} catch (e: Exception) {
continuation.resumeWithException(e)
}
}
continuation.invokeOnCancellation {
future.cancel(true)
}
}
}
paulblessing
01/22/2020, 10:49 PMwithContext
internally. But I can't seem to get the join
to ever wait the same way it does when using withContext
.paulblessing
01/23/2020, 1:02 AMsuspend fun <T> ExecutorService.submitAndAwait(task: () -> T): T {
// TODO: Is there something that can go here that
// - suspends the calling thread
// - runs the task on a thread provided by this ExecutorService
// - interrupts the worker thread when the coroutine context is cancelled, to interoperate nicely with legacy code
// that cooperates with cancellation by checking thread interruption
// - otherwise behaves the same as if we'd just used withContext, such that calling job.join() after job.cancel()
// will not resume until the task has finished
}
bezrukov
01/23/2020, 6:28 AMwithContext
all thread interruption checks don't work, so it's simple blocking call, because dispatcher's thread isn't marked as interrupted. That's why in your case sleep wasn't interruptedpaulblessing
01/23/2020, 11:37 AMbezrukov
01/23/2020, 12:03 PMval job = GlobalScope.launch {
withContext(someDispatcher) {
while (!Thread.interrupted()) { <-- Your code that checks interruption
// do nothing, just spin
}
}
}
delay(100)
println("cancel")
job.cancel()
println("after cancel") <-- will be never invoked, because the block inside withContext never finishes
job.join()
println("after join")
submitAndAwait
extension looking good, but it's behavior differs from your withContext expectationspaulblessing
01/23/2020, 2:08 PM// Let's say this is the existing code that I don't have control of.
fun existingCode(items: List<Item>) {
for (item in items) {
if (Thread.currentThread().isInterrupted) {
println("[Worker] Interrupted, not processing any more items")
return
}
println("[Worker] Start work for $item")
nonTrivialWork(item)
println("[Worker] Finished work for $item")
}
}
@Test fun example() {
val items = List(100) { index -> Item(id = index) }
val job = GlobalScope.launch {
try {
executorService.submitAndAwait { existingCode(items) }
} finally {
// Would like this cleanup code to wait to execute until the worker had the chance to respond to interruption
// i.e. would like to have the line "[Worker] Interrupted..." printed before the line "Cleaning up"
withContext(NonCancellable) {
println("Cleaning up")
cleanUp()
}
}
}
runBlocking {
delay(300) // Give the background work enough time to actually start before cancelling
println("Cancelling the job")
job.cancel()
delay(1000)
}
}
paulblessing
01/23/2020, 2:13 PMwithContext
is that if I did have the ability to update this existing code to take in the Job
and use it as a cancellation token, I could use withContext
and get it to behave the way I would like.
fun hypotheticalCode(items: List<Item>, job: Job) {
for (item in items) {
if (job.isCancelled) {
println("[Worker] Job cancelled, not processing any more items")
return
}
println("[Worker] Start work for $item")
nonTrivialWork(item)
println("[Worker] Finished work for $item")
}
}
@Test fun example2() {
val items = List(100) { index -> Item(id = index) }
val job = GlobalScope.launch {
try {
withContext(executorService.asCoroutineDispatcher()) { hypotheticalCode(items, coroutineContext[Job]!!) }
} finally {
withContext(NonCancellable) {
// In this example, the cleanup code does wait until the worker had the chance to respond to the job cancellation
// So we _do_ get "[Worker] Interrupted..." printed before the line "Cleaning up"
println("Cleaning up")
cleanUp()
}
}
}
runBlocking {
delay(300)
println("Cancelling the job")
job.cancel()
delay(5000)
}
}
paulblessing
01/23/2020, 2:48 PMwithContext
call with an `async`/`await` (ignoring the IDE suggestion to replace it with withContext
) causes the behavior to change such that the cleanup will happen before the worker had a chance to respond to cancellation. It's even pointed out in the docs for await
that cancellation causes immediate resumption.
Thanks for the help either way. If anyone can think of a way to pull off what I'm trying to do, I'm still all ears.