https://kotlinlang.org logo
Title
j

Jacob K

05/05/2023, 11:05 AM
I'm looking for some advice/insight... I'm having trouble understanding how/if https://github.com/grpc/grpc-java/blob/master/api/src/main/java/io/grpc/Contexts.java#L44 works well together with the kotlin
GrpcContextElement
https://github.com/grpc/grpc-kotlin/blob/master/stub/src/main/java/io/grpc/kotlin/GrpcContextElement.kt#L27 It's seems as the initial
GrpcContextElement
is created when the server-call is created, and that it is supposed to rely on kotlins coroutine system to ensure the context is available on the current thread (used by the coroutine). But once I create a new context with
Context.current().withValue(...)
and pass it on with
Contexts.interceptCall(...)
I worry that this newly created context doesn't "inherit" that coroutine specific logic. The current interceptor looks something like this:
data class DebugInfo(val enabled: Boolean, var debugCtx: DebugContext?)

val DEBUG_KEY = Context.key<DebugInfo>("debug-info")

val DEBUG_INFO_CTX =
    Metadata.Key.of(
        "x-debug-info-ctx${Metadata.BINARY_HEADER_SUFFIX}",
        ProtoUtils.metadataMarshaller(DebugContext.getDefaultInstance()))

class DebugInfoInterceptor : ServerInterceptor {
  override fun <ReqT : Any?, RespT : Any?> interceptCall(
      call: ServerCall<ReqT, RespT>,
      headers: Metadata,
      next: ServerCallHandler<ReqT, RespT>
  ): ServerCall.Listener<ReqT> {
    val debug = headers.get(DEBUG_HEADER)?.let { it == "true" } ?: false
    val debugInfo = DebugInfo(debug, null)
    val debugInfoContext = Context.current().withValue(DEBUG_KEY, debugInfo)
    val debugInfoCall =
        object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
          override fun close(status: Status, trailers: Metadata) {
            debugInfo.debugCtx?.let { trailers.put(DEBUG_INFO_CTX, it) }
            super.close(status, trailers)
          }
        }

    return Contexts.interceptCall(debugInfoContext, debugInfoCall, headers, next)
  }
}
and the corresponding server stub:
class SomeService() : SomeServiceGrpcKt.SomeServiceCoroutineImplBase() {
  override suspend fun someRpc(request: SomeRequest): SomeResponse {
    //
    // do suspending stuff
    //
    val debugInfo = DEBUG_KEY.get()
    if (debugInfo.enabled) {
      debugInfo.debugCtx = DebugContext.newBuilder().setSomething().build()
    }
    return SomeResponse().build()
  }
}
I'v been digging through the source but I can't convince myself that this is safe 😉
m

Minsoo Cheong

05/08/2023, 1:18 AM
From what I know: •
Context.current()
fetches the context object from ThreadLocal •
GrpcContextElement
, which implements
ThreadContextElement
sets the Context object to the ThreadLocal of thread that the current coroutine is executed on (then cleans it up), which makes the Context object approachable through
Context.current()
So yes, it will be safe to use your code. To further verify this, try printing the hashode of each context object that is retrieved in the interceptor & within the rpc method.
w

Wesley Hartford

05/09/2023, 5:29 PM
I do something very similar in an interceptor and the context values added by the interceptor are available to to the service's RPC functions.
The only case where I've had to manually propagate context elements is when context elements are used by some type of deferred processing. The case that comes to mind is when doing a lookup to asynchronously populate a cache. The cache implementation I'm using uses it's own thread pool to load values; when that cache loading code is invoked it doesn't have the gRPC context because the
GrpcContextElement
didn't automatically get passed along.