Matteo Mirk
07/20/2022, 12:51 PMclass PublishingServerCall<ReqT, RespT>(
next: ServerCall<ReqT, RespT>,
val publisher: EventPublisher
) : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(next) {
override fun close(status: Status, trailers: Metadata) {
if (!status.isOk) {
runBlocking {
supervisorScope {
launch(Dispatchers.Unconfined) {
publisher.publishError(status)
}
}
}
}
super.close(status, trailers)
}
}
I know there’s lot of grpc-specific stuff but bear with me, because it’s important details. Here is an implementation of a io.grpc.ServerCall
, an abstraction needed by gRPC infrastructure to implement a custom interceptor. As you see we override the close()
function because we’re interested in firing an event when the call is returning from a service and we have a response Status
.
We thought that this setup would allow us to publish the event asynchronously and move on in the interceptors chain without waiting for the publisher to complete. It seems to be working so far, but we only did some manual tests on our machines, it hasn’t been deployed yet. But the more I look at this code, the more I’m convinced it’s not doing what we thought: I have a feeling that the whole runBlocking {}
part is useless and it’s blocking the thread anyway waiting for the return, and it would be the same to just invoke the publisher directly. Of course it’s needed because the enclosing function is not suspendable, but I’m full of doubts and don’t know if we wrote the right thing.
Is this function blocking the request thread or is it returning right away, without waiting for the publication?
Although skilled in service programming and Kotlin, we’re pretty much beginners on coroutines, so please if anyone has the expertise an advice will be much appreciated, thank you!Joffrey
07/20/2022, 12:53 PMI have a feeling that the wholeExactlypart is [...] blocking the thread anyway waiting for the return, and it would be the same to just invoke the publisher directlyrunBlocking {}
Matteo Mirk
07/20/2022, 12:54 PMMethods are guaranteed to be non-blocking. Implementations are not required to be thread-safe.does this mean that we can do whatever we want here and the framework is handling concurrency for us?
Joffrey
07/20/2022, 12:56 PMrunBlocking
is only one way to launch coroutines. And this way of doing things is meant for when you want to wait for the result in a context where you cannot suspend (such as a callback or method implementation from an external API).
Another way is to create a CoroutineScope
and store it in a property somewhere, then use that scope to launch the coroutine. Cancel the scope when you know all coroutines launched in it are no longer neededIf no errors or cancellations are known to have occurred, then anotification should be expected, independent ofServerCall.Listener.onComplete()
. Otherwisestatus
has been or will be called.ServerCall.Listener.onCancel()
streetsofboston
07/20/2022, 1:00 PMMatteo Mirk
07/20/2022, 1:01 PMJoffrey
07/20/2022, 1:04 PMthe call to supervisorScope is no longer needed either
supervisorScope
was actually already pointless under runBlocking
Matteo Mirk
07/20/2022, 1:05 PMServerCall.Listener
is another abstraction for intercepting actual request messages, it can or cannot be used (yes in our case, for a particular reason but I omitted the code for simplicity). A ServerCall
represents a remote call (RPC) to the service but doesn’t have access to the messages, only the Listener does.The call toyeah we basically don’t know very well what we’re doing with coroutines 😅 , we thought it was needed so that i the publisher raised an exception it wouldn’t fail the parent scopewas actually already pointless undersupervisorScope
runBlocking
Dispatchers.Unconfined
because the default one “wasn’t working”; I’ve read the docs about dispatchers many times but I’m still unsure if this is needed…Joffrey
07/20/2022, 1:11 PMsupervisorScope
not cancelling other children in case of failure of 1 child, but here there is only 1 child anyway, so in that sense it is pointless. However, it is still useful in the sense that exceptions would otherwise bubble up to the runBlocking
call. That said, when using runBlocking
you usually want those exceptions since you're waiting after your coroutines here.because the default one “wasn’t working”It would be interesting to know exactly what they mean by this
Matteo Mirk
07/20/2022, 1:20 PMJoffrey
07/20/2022, 1:22 PMMatteo Mirk
07/20/2022, 1:22 PMCoroutineScope
in my interceptor? It needs a CoroutineContext
… how do I create one?! 🤕Joffrey
07/20/2022, 1:23 PMEmptyCoroutineContext
, but I would usually at least add a CoroutineName("useful name")
or a dispatcher anyway, both of which are coroutine contexts:
val scope = CoroutineScope(CoroutineName("some-name"))
val scope = CoroutineScope(Dispatchers.Default)
val scope = CoroutineScope(Dispatchers.Default + CoroutineName("some-name"))
Basically every coroutine context element is also a CoroutineContext
in itself, and you can optionally combine them together with +
to create a new coroutine context. When you have no actual element to provide, just use the object EmptyCoroutineContext
.Matteo Mirk
07/20/2022, 1:24 PMwhich I guess is the opposite of what you wanted?Yeah, maybe that would be the rationale behind it, apart strange hung-ups, we were led to think it was the right way to make an fire&forget call
Cancel the scope when you know all coroutines launched in it are no longer neededhow do I know that or where should I perform it? Don’t coroutines terminate once their block completes? Currently I’m trying to create such scope as a property of the interceptor (which is managed by Spring, so a singleton) and passing it to every new instance of
PublishingServerCall
Joffrey
07/20/2022, 1:38 PMDon’t coroutines terminate once their block completes?They do, don't worry about this. It's more a matter of cleanup in case something hangs. If you're using these coroutines from some component with a lifecycle, then terminating that component would warrant cancelling the related scope to make sure all related coroutines get cancelled. If the component lives anyway for the whole life of the application, then you don't have to cancel it. It will act similarly to
GlobalScope
but with the addition of proper structured concurrency because yours will have a Job
inside (automatically added).CoroutineScope(SupervisorJob())
to make sure that if one of the coroutines crashes, it doesn't cancel the others and the whole scope with it.
Check this out: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-supervisor-job.htmlMatteo Mirk
07/20/2022, 1:55 PMIf the component lives anyway for the whole life of the application, then you don’t have to cancel it.yes it’s our case, the interceptor lives as long as the service does. Is it okay then to instantiate the CoroutineScope inside it?