Hi everyone, in my team we’re having a hard time u...
# coroutines
m
Hi everyone, in my team we’re having a hard time understanding if launching a coroutine in a certain point of the application is useful or not. We’re developing a gRPC service, and inside a global interceptor we need to publish a domain event, for tracking purposes. Since this action should be the “fire & forget” kind, we thought it would be a good idea to launch a coroutine for publishing the event and then carry on with the execution. Here’s a simplified version of our code with changed names:
Copy code
class PublishingServerCall<ReqT, RespT>(
        next: ServerCall<ReqT, RespT>,
        val publisher: EventPublisher
    ) : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(next) {
        override fun close(status: Status, trailers: Metadata) {
            if (!status.isOk) {
                runBlocking {
                    supervisorScope {
                        launch(Dispatchers.Unconfined) {
                            publisher.publishError(status)
                        }
                    }
                }
            }
            super.close(status, trailers)
        }
    }
I know there’s lot of grpc-specific stuff but bear with me, because it’s important details. Here is an implementation of a
io.grpc.ServerCall
, an abstraction needed by gRPC infrastructure to implement a custom interceptor. As you see we override the
close()
function because we’re interested in firing an event when the call is returning from a service and we have a response
Status
. We thought that this setup would allow us to publish the event asynchronously and move on in the interceptors chain without waiting for the publisher to complete. It seems to be working so far, but we only did some manual tests on our machines, it hasn’t been deployed yet. But the more I look at this code, the more I’m convinced it’s not doing what we thought: I have a feeling that the whole
runBlocking {}
part is useless and it’s blocking the thread anyway waiting for the return, and it would be the same to just invoke the publisher directly. Of course it’s needed because the enclosing function is not suspendable, but I’m full of doubts and don’t know if we wrote the right thing. Is this function blocking the request thread or is it returning right away, without waiting for the publication? Although skilled in service programming and Kotlin, we’re pretty much beginners on coroutines, so please if anyone has the expertise an advice will be much appreciated, thank you!
j
I have a feeling that the whole
runBlocking {}
part is [...] blocking the thread anyway waiting for the return, and it would be the same to just invoke the publisher directly
Exactly
m
ok bad news 😞 does it exist a way to simply launch the event publication in a separate coroutine/thread and return immediately?
the ServerCall docs say at some point:
Methods are guaranteed to be non-blocking. Implementations are not required to be thread-safe.
does this mean that we can do whatever we want here and the framework is handling concurrency for us?
j
runBlocking
is only one way to launch coroutines. And this way of doing things is meant for when you want to wait for the result in a context where you cannot suspend (such as a callback or method implementation from an external API). Another way is to create a
CoroutineScope
and store it in a property somewhere, then use that scope to launch the coroutine. Cancel the scope when you know all coroutines launched in it are no longer needed
🤔 1
☝️ 2
today i learned 1
I don't know the grpc-java framework, but it seems the doc of the close method suggests that there is a listener API that you could use instead of overriding it:
If no errors or cancellations are known to have occurred, then a
ServerCall.Listener.onComplete()
notification should be expected, independent of
status
. Otherwise
ServerCall.Listener.onCancel()
has been or will be called.
s
What Joffrey said. Provide a CoroutineScope to the PublishingServerCall class (eg publisherScope) and instead of calling runBlocking call publisherScope.launch (the call to supervisorScope is no longer needed either)
🆒 1
today i learned 1
m
Cool thanks both for the advice!
j
the call to supervisorScope is no longer needed either
The call to
supervisorScope
was actually already pointless under
runBlocking
EDIT: scratch that, see below
m
@Joffrey the
ServerCall.Listener
is another abstraction for intercepting actual request messages, it can or cannot be used (yes in our case, for a particular reason but I omitted the code for simplicity). A
ServerCall
represents a remote call (RPC) to the service but doesn’t have access to the messages, only the Listener does.
👌 1
The call to
supervisorScope
was actually already pointless under
runBlocking
yeah we basically don’t know very well what we’re doing with coroutines 😅 , we thought it was needed so that i the publisher raised an exception it wouldn’t fail the parent scope
Also, my teammate who wrote the initial code ended up using
Dispatchers.Unconfined
because the default one “wasn’t working”; I’ve read the docs about dispatchers many times but I’m still unsure if this is needed…
j
Actually I was wrong here. I was thinking in terms of
supervisorScope
not cancelling other children in case of failure of 1 child, but here there is only 1 child anyway, so in that sense it is pointless. However, it is still useful in the sense that exceptions would otherwise bubble up to the
runBlocking
call. That said, when using
runBlocking
you usually want those exceptions since you're waiting after your coroutines here.
🙏 1
because the default one “wasn’t working”
It would be interesting to know exactly what they mean by this
m
I don’t remember exactly, I’ll ask them again. I barely recall something like the service startup or some integration tests hung up using the default, and using Unconfined solved it… 🤷
j
Unconfined basically "starts" the execution of the coroutine in the current thread, up to the first suspension point. If you're not using any calls to suspend functions in there, there is no suspension point, and everything actually runs immediately on the current thread... which I guess is the opposite of what you wanted?
m
how do I create a
CoroutineScope
in my interceptor? It needs a
CoroutineContext
… how do I create one?! 🤕
j
You can provide the
EmptyCoroutineContext
, but I would usually at least add a
CoroutineName("useful name")
or a dispatcher anyway, both of which are coroutine contexts:
Copy code
val scope = CoroutineScope(CoroutineName("some-name"))

val scope = CoroutineScope(Dispatchers.Default)

val scope = CoroutineScope(Dispatchers.Default + CoroutineName("some-name"))
Basically every coroutine context element is also a
CoroutineContext
in itself, and you can optionally combine them together with
+
to create a new coroutine context. When you have no actual element to provide, just use the object
EmptyCoroutineContext
.
1
🙏 1
m
which I guess is the opposite of what you wanted?
Yeah, maybe that would be the rationale behind it, apart strange hung-ups, we were led to think it was the right way to make an fire&forget call
@Joffrey
Cancel the scope when you know all coroutines launched in it are no longer needed
how do I know that or where should I perform it? Don’t coroutines terminate once their block completes? Currently I’m trying to create such scope as a property of the interceptor (which is managed by Spring, so a singleton) and passing it to every new instance of
PublishingServerCall
j
Don’t coroutines terminate once their block completes?
They do, don't worry about this. It's more a matter of cleanup in case something hangs. If you're using these coroutines from some component with a lifecycle, then terminating that component would warrant cancelling the related scope to make sure all related coroutines get cancelled. If the component lives anyway for the whole life of the application, then you don't have to cancel it. It will act similarly to
GlobalScope
but with the addition of proper structured concurrency because yours will have a
Job
inside (automatically added).
🆒 1
👍 1
By the way, in your case you might want to create your scope with
CoroutineScope(SupervisorJob())
to make sure that if one of the coroutines crashes, it doesn't cancel the others and the whole scope with it. Check this out: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-supervisor-job.html
☝️ 1
m
Oh right, thank you! So we were almost right when choosing the supervisorScope 😉
If the component lives anyway for the whole life of the application, then you don’t have to cancel it.
yes it’s our case, the interceptor lives as long as the service does. Is it okay then to instantiate the CoroutineScope inside it?
👌 1
🙏