https://kotlinlang.org logo
#coroutines
Title
# coroutines
p

paulblessing

01/22/2020, 9:12 PM
Is there a way to configure
suspendCancellableCoroutine
to cause
.join()
after cancellation to suspend until the Continuation is resumed / the background work is finished, similar to how things work when using a
withContext
to switch threads? I'm curious if I can get these to behave the same:
Copy code
private suspend fun runViaThread(): String {
  return suspendCancellableCoroutine { continuation ->
    thread {
      println("Starting background work")
      Thread.sleep(2000)
      println("Finished background work")
      continuation.resume("done")
    }
  }
}

private suspend fun runViaWithContext(): String {
  return withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    println("Starting background work")
    Thread.sleep(2000)
    println("Finished background work")
    "done"
  }
}

@Test fun testViaThread() {
  /*
  Prints:
    Starting background work
    Before join "coroutine#2":StandaloneCoroutine{Cancelling}@5c7fa833
    After join "coroutine#2":StandaloneCoroutine{Cancelled}@5c7fa833
    Finished background work
   */
  runBlocking {
    val job = launch(Dispatchers.Default) { runViaThread() }

    delay(1000)
    job.cancel()
    println("Before join $job")
    job.join()
    println("After join $job")

    delay(3000)
  }
}

@Test fun testViaWithContext() {
  /*
  Prints:
    Starting background work
    Before join "coroutine#4":StandaloneCoroutine{Cancelling}@2b98378d
    Finished background work
    After join "coroutine#4":StandaloneCoroutine{Cancelled}@2b98378d
   */
  runBlocking {
    val job = launch(Dispatchers.Default) { runViaWithContext() }

    delay(1000)
    job.cancel()
    println("Before join $job")
    job.join()
    println("After join $job")

    delay(3000)
  }
}
b

bezrukov

01/22/2020, 10:23 PM
If you will switch to
suspendCoroutine
instead of
suspendCancellableCoroutine
both methods will behave the same
But it's recommended to support cooperative cancellation (you don't support it), otherwise you loose one of the biggest advantage of coroutines. E.g. with coroutines you need to use
delay
instead of
Thread.sleep
. When you need non cancellable piece of code, you can wrap it to
withContext(NonCancellable)
explicitly and it will work regardless of cancellation support inside the block.
p

paulblessing

01/22/2020, 10:45 PM
The code I'm actually trying to implement is meant to support cancellation, and I'm definitely aware of using
delay
. I'm actually trying to do something more along the lines of this - was just trying to simplify as much as possible for asking the question.
Copy code
suspend fun <T> ExecutorService.submitAndAwait(task: () -> T): T {
  return suspendCancellableCoroutine { continuation ->
    val future = submit<Unit> {
      try {
        val result = task()
        continuation.resume(result)
      } catch (e: Exception) {
        continuation.resumeWithException(e)
      }
    }
    continuation.invokeOnCancellation {
      future.cancel(true)
    }
  }
}
I have a lot of code that checks thread interruption for cooperative cancellation, and it would be nice to be able to re-use that code in the context of a coroutine, while still having it behave externally using this API the same as if it was written with
withContext
internally. But I can't seem to get the
join
to ever wait the same way it does when using
withContext
.
Ultimately, I'm trying to decide there is a way to fill in the body of this extension function:
Copy code
suspend fun <T> ExecutorService.submitAndAwait(task: () -> T): T {
  // TODO: Is there something that can go here that
  //  - suspends the calling thread
  //  - runs the task on a thread provided by this ExecutorService
  //  - interrupts the worker thread when the coroutine context is cancelled, to interoperate nicely with legacy code
  //    that cooperates with cancellation by checking thread interruption
  //  - otherwise behaves the same as if we'd just used withContext, such that calling job.join() after job.cancel()
  //    will not resume until the task has finished
}
b

bezrukov

01/23/2020, 6:28 AM
When you are using
withContext
all thread interruption checks don't work, so it's simple blocking call, because dispatcher's thread isn't marked as interrupted. That's why in your case sleep wasn't interrupted
p

paulblessing

01/23/2020, 11:37 AM
Perhaps the example with the sleeps was a bad one to use and I think it may be getting in the way. Feel free to ignore that. I'm simply looking for a way to be able to write a suspending function that can run existing code that is already coded to check for thread interruption (no sleeps anywhere) on threads provided by an ExecutorService. I essentially would like it to work externally same as withContext would, except that internally the worker thread can also get interrupted when the coroutine's job is cancelled.
b

bezrukov

01/23/2020, 12:03 PM
The main point I use sleep in my comments because it supports cooperative cancellation via checking interruption and can be replaced with your blocking code that checks thread interruption. So I see difference between what you are trying to achive and what you are calling as "the same as withContext".
Copy code
val job = GlobalScope.launch {
    withContext(someDispatcher) {
        while (!Thread.interrupted()) { <-- Your code that checks interruption
            // do nothing, just spin 
        }
    }
}
delay(100)
println("cancel")
job.cancel()
println("after cancel") <-- will be never invoked, because the block inside withContext never finishes
job.join()
println("after join")
submitAndAwait
extension looking good, but it's behavior differs from your withContext expectations
p

paulblessing

01/23/2020, 2:08 PM
Here's a hopefully more realistic example.
Copy code
// Let's say this is the existing code that I don't have control of.
fun existingCode(items: List<Item>) {
  for (item in items) {
    if (Thread.currentThread().isInterrupted) {
      println("[Worker] Interrupted, not processing any more items")
      return
    }

    println("[Worker] Start work for $item")
    nonTrivialWork(item)
    println("[Worker] Finished work for $item")
  }
}

@Test fun example() {
  val items = List(100) { index -> Item(id = index) }
  val job = GlobalScope.launch {
    try {
      executorService.submitAndAwait { existingCode(items) }
    } finally {
      // Would like this cleanup code to wait to execute until the worker had the chance to respond to interruption
      // i.e. would like to have the line "[Worker] Interrupted..." printed before the line "Cleaning up"
      withContext(NonCancellable) {
        println("Cleaning up")
        cleanUp()
      }
    }
  }
  runBlocking {
    delay(300) // Give the background work enough time to actually start before cancelling
    println("Cancelling the job")
    job.cancel()
    delay(1000)
  }
}
What I mean when I say I'd like it to behave similar to
withContext
is that if I did have the ability to update this existing code to take in the
Job
and use it as a cancellation token, I could use
withContext
and get it to behave the way I would like.
Copy code
fun hypotheticalCode(items: List<Item>, job: Job) {
  for (item in items) {
    if (job.isCancelled) {
      println("[Worker] Job cancelled, not processing any more items")
      return
    }

    println("[Worker] Start work for $item")
    nonTrivialWork(item)
    println("[Worker] Finished work for $item")
  }
}

@Test fun example2() {
  val items = List(100) { index -> Item(id = index) }
  val job = GlobalScope.launch {
    try {
      withContext(executorService.asCoroutineDispatcher()) { hypotheticalCode(items, coroutineContext[Job]!!) }
    } finally {
      withContext(NonCancellable) {
        // In this example, the cleanup code does wait until the worker had the chance to respond to the job cancellation
        // So we _do_ get "[Worker] Interrupted..." printed before the line "Cleaning up"
        println("Cleaning up")
        cleanUp()
      }
    }
  }
  runBlocking {
    delay(300)
    println("Cancelling the job")
    job.cancel()
    delay(5000)
  }
}
I'm guessing it's probably not easy or possible. Even replacing that
withContext
call with an `async`/`await` (ignoring the IDE suggestion to replace it with
withContext
) causes the behavior to change such that the cleanup will happen before the worker had a chance to respond to cancellation. It's even pointed out in the docs for
await
that cancellation causes immediate resumption. Thanks for the help either way. If anyone can think of a way to pull off what I'm trying to do, I'm still all ears.
30 Views