https://kotlinlang.org logo
#arrow
Title
k

Kristian Frøhlich

09/10/2023, 8:54 AM
So, i’m using SuspendApp to handle clean shutdown of resource and would say that the structured concurrency part is working nicely. I have a question about semantics though and want to verify if my expectations are valid or not 🙂 I’m using @simon.vergauwen little nice helper function to create coroutine scopes as resources:
Copy code
context(ResourceScope)
suspend fun coroutineScope(context: CoroutineContext) =
    install({ CoroutineScope(context) }, { scope, exitCase ->
        when (exitCase) {
            ExitCase.Completed -> scope.cancel()
            is ExitCase.Cancelled -> scope.cancel(exitCase.exception)
            is ExitCase.Failure -> scope.cancel("Resource failed, so cancelling associated scope", exitCase.failure)
        }
        scope.coroutineContext.job.join()
    })
When using this in a processor like the following:
Copy code
context (ResourceScope)
fun schemaProcessor(): SchemaProcessor = SchemaProcessor {
    val scope = coroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)
    return@SchemaProcessor scope.launch(Job(coroutineContext.job)) {
        delay(10000)
        throw IllegalArgumentException("BOOM")
    }
}
….to force an exception i would expect this would trigger the
ExitCase.Failure
but
ExitCase.Cancelled
is triggered. Looks like the failure is never reached whatever exception is thrown. This means that if i want to do ie logging i have to check on the exception type in the cancelled block instead of just writing out the failure. Is this working like this due to how the nature of coroutines is ? In what cases would the
ExitCase.Failure
be triggered ?
s

simon.vergauwen

09/11/2023, 6:46 AM
Hey @Kristian Frøhlich, Do you mean
ExitCase.Failure
inside
coroutineScope
? The
ExitCase
comes from the ResourceScope, so in this case not from the `launch`/`Job` running inside but that might depend a bit on how
Job(coroutineContext.job)
relates to each other so I I have a couple of questions to try and figure out what is going on but t I am not entire sure how
SchemaProcessor
works, is it something like:
Copy code
fun interface SchemaProcessor {
  suspend fun name(): Job
}
Since you're calling
launch(Job(coroutineContext.job))
I assume that the
coroutineContext
where
ResourceScope
is being created already has a
Job
? Is that coming from
runBlocking
?
k

Kristian Frøhlich

09/11/2023, 6:52 AM
Yeah, inside the coroutineScope. The processor is defined like you assume, too. Maybe i just should show you how my resourceScope looks like too, give me a sec.
Copy code
fun main(): Unit = SuspendApp {
    val config = config()
    resourceScope {
        val hikari = hikari(config.database.toProperties())
        val schemaDatabase = schemaDatabase(hikari)

        with(config, schemaDatabase, logger()) {
            with(schemaRepository()) {
                with(schemaService(), schemaConsumer()) {
                    schemaProcessor().process()
                    server(Netty, host = "0.0.0.0", port = 8080, preWait = 5.seconds) { setup() }
                }
            }
        }
        awaitCancellation()
    }
}
The main purpose of
launch(Job(coroutineContext.job))
is to link it with the parent job, but i think you know that already 🙂 Or else it won’t propagate the exception. I’m open to solving it in a different way if there are more idiomatic ways.
s

simon.vergauwen

09/11/2023, 7:17 AM
Yes, but I thought that would've happened automatically 🤔 but now I see that's not the case. I think it would've also worked with this updated code.
Copy code
context(ResourceScope)
suspend fun coroutineScope(context: CoroutineContext) =
    install({ CoroutineScope(currentCoroutineContext() + context) }, { scope, exitCase ->
        when (exitCase) {
            ExitCase.Completed -> scope.cancel()
            is ExitCase.Cancelled -> scope.cancel(exitCase.exception)
            is ExitCase.Failure -> scope.cancel("Resource failed, so cancelling associated scope", exitCase.failure)
        }
        scope.coroutineContext.job.join()
    })
But that is regardless of your original question of course.
k

Kristian Frøhlich

09/11/2023, 7:19 AM
Right
s

simon.vergauwen

09/11/2023, 7:20 AM
I think this is related to
server(Netty, ...
because in my minimal example I am seeing the following:
prints:
Copy code
3
ExitCase.Completed
coroutineScope: ExitCase.Completed
k

Kristian Frøhlich

09/11/2023, 7:23 AM
Hmmmmm
s

simon.vergauwen

09/11/2023, 7:23 AM
Ah wait, nevermind
runBlocking
and
SuspendApp
work a bit different
k

Kristian Frøhlich

09/11/2023, 7:24 AM
👍
s

simon.vergauwen

09/11/2023, 7:36 AM
Okay, ran into something super odd 😅
Anyhow, the reason for
ExitCase.Cancelled
is
awaitCancellation
. Since the
IllegalArgumentException
inside
launch
cancels the parent,
awaitCancellation
throws it's
CancellationException
. If inside
ExitCase.Cancelled
you check for the
cause
you'll get the original
IllegalArgumentExeption
. So to finally answer your original questions: • Yes, this is how coroutines work. A child cancelling it's parents result in the original exception being wrapped in
CancellationException
. So the
ResourceScope
only ever sees
CancellationException
and not
IllegalArgumentException
. It's present in the
cause
if you want to have it logged though. • So a child cannot trigger
ExitCase.Failure
, this would be triggered only if an exception is being thrown inside of the
resourceScope
directly. So if you replace
awaitCancellation
with
IllegalArgumentException
. Sorry for some of the confusion above, I always need to think a little of what's going on 😅
k

Kristian Frøhlich

09/11/2023, 8:22 AM
Well, yeah, i had a feeling it was heading in this direction. Thanks for the effort though 😅 So basically how i’m currently solving it is the way to go. Quick conclusion would be that: 1. A CancellationException without a cause == normal shutdown / expected behaviour - do nothing 2. A CancellationException with a cause is a Failure - unwrap it and log
s

simon.vergauwen

09/11/2023, 8:44 AM
Yes, exactly. Sadly that's quite common practice for logging when cancellation comes into play. What I've seen is that the logging typically happens inside
launch
itself. Since that's "a unit of work", and logging it on the end of the cancellation process can be considered bad practice cause it mixing concerns
👍 1
k

Kristian Frøhlich

09/11/2023, 9:29 AM
Simon Vergauwen [MOD] [9:17 AM]
Yes, but I thought that would’ve happened automatically 🤔 but now I see that’s not the case. I think it would’ve also worked with this updated code.
I guess that means removing the explicit linking in the launch…..does not seem to work for me. I agree it would be nice if one could link the child-coroutine to the parent without doing it explicit for every launch..
…works if move it to where we are actually creating the scope, though:
Copy code
val scope = coroutineScope(<http://Dispatchers.IO|Dispatchers.IO>) + currentCoroutineContext()
👍 1
s

simon.vergauwen

09/12/2023, 5:41 AM
Where did you get that snippet? https://github.com/xebia-functional/gh-alerts-subscriptions-kotlin? I'm going to update it to inherit the outer parent job.
does not seem to work for me.
Seems to be something strange going on, I have several version that work and several that don't 🤕 but it should all be the same
k

Kristian Frøhlich

09/12/2023, 6:05 AM
@simon.vergauwen Had a look again last night and made it work. Just needed to think about what’s going on 🙂 If we lift the
currentCoroutineContext()
call outside the
CoroutineContext(..)
call and remove the explicit
...job.join()
it flows as expected
Copy code
context(ResourceScope)
suspend fun coroutineScope(context: CoroutineContext): CoroutineScope {
    val currentContext = currentCoroutineContext()
    return install({ CoroutineScope(currentContext + context) }, { scope, exitCase ->
        when (exitCase) {
            ExitCase.Completed -> scope.cancel()
            is ExitCase.Cancelled -> scope.cancel(exitCase.exception)
            is ExitCase.Failure -> scope.cancel("Resource failed, so cancelling associated scope", exitCase.failure)
        }
    })
}
it’s nice to have the parent / child relationship automatically wired up for new coroutines
s

simon.vergauwen

09/12/2023, 6:09 AM
Any idea why you had to remove the
remove the explicit ...job.join() it flows as expected
?
k

Kristian Frøhlich

09/12/2023, 6:10 AM
Not sure why it hangs forever if i not remove it. Might be som coroutine magic and cancellation.
the job is in cancellation state when i inspect it so should work
s

simon.vergauwen

09/12/2023, 6:20 AM
I'm thinking this is most accurate compared to
coroutineScope { }
digging again into the internals of
coroutineScope
. WDYT? I think it was missing a new job that inherits from the parent.
Copy code
context(ResourceScope)
suspend fun coroutineScope(context: CoroutineContext): CoroutineScope {
    val newContext = currentCoroutineContext()
    val job = currentCoroutineContext()[Job]?.let { Job(it) } ?: Job()
    return install({ CoroutineScope(context + newContext + job) }) { scope, exitCase ->
        println(exitCase)
        when (exitCase) {
            ExitCase.Completed -> job.cancel()
            is ExitCase.Cancelled -> job.cancel(exitCase.exception)
            is ExitCase.Failure -> job.cancel("Resource failed, so cancelling associated scope", exitCase.failure)
        }
        job.join()
    }
}
k

Kristian Frøhlich

09/12/2023, 6:25 AM
Ahh, of course. That’s the missing piece. Yep, flows nicely now
🙌 1
That’s a very nice helper function now 🙂
s

simon.vergauwen

09/12/2023, 6:28 AM
I've thought of adding it into Arrow in the past.. Do you think it's interesting to addd?
k

Kristian Frøhlich

09/12/2023, 6:29 AM
That would be super nice.
Since we talk about libraries. Just a quick question i would like your thoughts about. This is more of a SuspendApp question, but related to what we’ve been discussing. Since i’m using the code above to handle and log all exceptions i find it quite redundant to let the exceptions bubble all the way out to the edge of my apps and let the default exception handler print the stacktrace to the console when i already log everything. I often find myself silencing that by doing something like this:
Copy code
SuspendApp(SupervisorJob() + CoroutineExceptionHandler { _, _ -> })...
Would it make sense to have a shortcut to this through the library ? Ie, SuspendApp(emptyExHandler = true) ? Or something similar.
@simon.vergauwen Any thoughts / feedback on this ? ^^
s

simon.vergauwen

09/14/2023, 10:42 AM
Hey Kristian, Sorry I forgot to reply, so far I tried to maintain a similar signature as
runBlocking
.
handle and log all exceptions i find it quite redundant to let the exceptions bubble all the way out to the edge of my apps
I do the same thing, but then it's no longer necessary to install
SupervisorJob() + CoroutineExceptionHandler { _, _ -> }
, right? 🤔 What is the reason for the
SupervisorJob()
btw? I'm not sure if it makes sense for the library but it's very annoying if exceptions get logged twice, since it's sometimes gives the impression that something is wrong when it's not
k

Kristian Frøhlich

09/14/2023, 10:44 AM
No worries. Had a quick look at the source code in SuspendApp and it’s the outer job defined there that’s printing the exception. I have no way of overriding the default exception handler for that job so i’m basicly stopping the propagation by creating a SupervisorJob
would it possibly make sense to have an opt-in for the actual exception handler which then could be used(if defined) by the SuspendApp job ? I’m talking about this job:
val job: Job = launch(context = context, start = CoroutineStart.LAZY) { block() }
Or maybe I understood you incorrectly 🤔
the actual exception handler
The code that runs inside
job
is the lambda you pass to
SuspendApp { }
so you can install any exception handler inside that lambda
For example on Kotlin/Native I had to resort to this, since I had some exceptions hanging my native process on K8s. https://github.com/nomisRev/ktor-k8s-zero-downtime/blob/33739b6cb4db1661aa3c8f7b7293de5c80246994/src/commonMain/kotlin/com/fortysevendegrees/main.kt#L35 I catch all errors using
either
(or
Result
) and log them without rethrowing so the process exits with process code 0
k

Kristian Frøhlich

09/14/2023, 11:17 AM
Will still propagate unless i create a SupervisorJob. If I just pass a Job the printed stack trace also includes info about parent job cancelling
Either won’t catch the Cancellation exception, right ? The default exception handler prints the wrapped exception I would think
s

simon.vergauwen

09/14/2023, 1:14 PM
No,
Either
won't catch
CancellationException
you need
Result
for that
k

Kristian Frøhlich

09/14/2023, 1:17 PM
Right. I’m just trying to wrap my head around how this should work 😅 In all the services I’ve written so far I had to to that Supervisor + empty ex-handler stuff at the top level to stop it propagate
Hence the question if it should be a shortcut in the library.
I’m also using the latest version of SuspendApp if that helps
2 Views