I was doing some diligence over the weekend to see...
# ktor
k
I was doing some diligence over the weekend to see how ktor would hold up for some of our needs. I was checking to see how ktor handles coroutine suspend / resume because we have a need to use structured logging in our apps. We’ve traditionally handled things like requestId in spring by using MDC.putCloseable in an interceptor. I put together a simple example below with ktor. This seems to be working out of the box without me having to do anything special, but it seems like I’m just getting lucky (or maybe false positives) because it’s not what I would have expected.
Copy code
fun main(args: Array<String>) {
    val server = embeddedServer(Netty, port = 9000) {
        intercept(ApplicationCallPipeline.Infrastructure){
            val requestId = UUID.randomUUID()
            MDC.putCloseable("req.Id", requestId.toString()).use {
                <http://this.context.application.log.info|this.context.application.log.info>("Interceptor")
                proceed()
            }
        }
        routing {
            get("/") {
                <http://call.application.log.info|call.application.log.info>("Inside /")
                call.respondText("Hello World!", ContentType.Text.Plain)
            }
        }
    }
    server.start(wait = true)
}
I tested using
wrk -t6 -c200 -d15s <http://localhost:9000/>
just to get a little bit of concurrency. Added context: https://github.com/Kotlin/kotlinx.coroutines/issues/119
1
📝 1
Okay, was finally able to reproduce what I was expecting to find by moving the log statement after proceed in the interceptor.
Now the real question is, what are my options here with ktor?
d
Let me check, but what elizarov says makes sense. It looks hacky, but should work. So I guess that your question is how to change your code to use it, right? Ideally would be nice to use an option for structured logging that do not rely on ThreadLocal stuff, and use instances instead. Not sure if MDC has something for that. If not, what we can try is to use the coroutine interceptor to do as required:
Copy code
MDC.put(key, value)
MDC.remove(key)
Copy code
val REQ_ID = AttributeKey<String>("REQ_ID")

fun ApplicationCall.checkReqId() {
    check(MDC.get("req.Id") == attributes.get(REQ_ID))
}

fun main(args: Array<String>) {
    val server = embeddedServer(Netty, port = 9000) {
        intercept(ApplicationCallPipeline.Infrastructure) {
            val requestId = UUID.randomUUID()
            call.attributes.put(REQ_ID, requestId.toString())
            MDC_associate("req.Id", requestId.toString()) {
                call.checkReqId()
                <http://this.context.application.log.info|this.context.application.log.info>("Interceptor[start] ${MDC.get("req.Id")}")
                proceed()
                call.checkReqId()
                <http://this.context.application.log.info|this.context.application.log.info>("Interceptor[end] ${MDC.get("req.Id")}")
            }
        }
        routing {
            get("/") {
                call.checkReqId()
                <http://call.application.log.info|call.application.log.info>("Inside / ${MDC.get("req.Id")}")
                call.checkReqId()
                call.respondText("Hello World! ${MDC.get("req.Id")}", ContentType.Text.Plain)
                call.checkReqId()
            }
        }
    }
    server.start(wait = true)
}

suspend fun MDC_associate(key: String, value: String, block: suspend () -> Unit) {
    withContext(MDCSetContext(key, value, DefaultDispatcher)) {
        block()
    }
    MDC.remove(key)
}

class MDCSetContext(
    private var mdcKey: String,
    private var mdcValue: String,
    private val dispatcher: CoroutineDispatcher
) : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        dispatcher.interceptContinuation(Wrapper(continuation))

    inner class Wrapper<T>(private val continuation: Continuation<T>): Continuation<T> {
        private inline fun wrap(block: () -> Unit) {
            MDC.put(mdcKey, mdcValue)
            block()
        }

        override val context: CoroutineContext get() = continuation.context
        override fun resume(value: T) = wrap { continuation.resume(value) }
        override fun resumeWithException(exception: Throwable) = wrap { continuation.resumeWithException(exception) }
    }
}
this seems to work
I have added the
checkReqId
and associated the requestId to the call, so you can verify that it works
Edit: Actually the
MDC.remove
from my example potentially wouldn’t remove it from some thread locals of the MDC. But at least should work when wrapping the whole route
k
Thanks, it makes more sense now. I had a mental block on how to apply that with ktor. For MDC itself, it doesn’t appear that there’s a way to avoid using ThreadLocal.
There may be a way to wrap a logging implementation and capture and correlate key/value pairs separate from MDC. The logstash-logback-encoder has support for scenarios like this, but only via additional arguments to the log statement itself. For context: (See StructuredArguments in readme) https://github.com/logstash/logstash-logback-encoder
d
Yep, let me draft a possible way for using this
What about…:
Copy code
fun Route.handleRoot() {
    get("/") {
        logAttach("req.Id", UUID.randomUUID().toString()) {
            logInfo("Inside /")
            call.respondText("Hello World!")
        }
    }
}

val LOG_ATTRIBS = AttributeKey<LinkedHashMap<String, String>>("LOG_ATTRIBS")

val ApplicationCall.logAttribs get() = attributes.computeIfAbsent(LOG_ATTRIBS) { LinkedHashMap() }

inline fun PipelineContext<Unit, ApplicationCall>.logAttach(key: String, value: String, callback: () -> Unit) {
    call.logAttribs[key] = value
    try {
        callback()
    } finally {
        call.logAttribs.remove(key)
    }
}

fun PipelineContext<Unit, ApplicationCall>.logInfo(text: String) {
    <http://call.application.log.info|call.application.log.info>(text, call.logAttribs) // Does this logger support a map with attributes?
}
If you prefer to have a separate instance for logging, you can create your own logger wrapping another logger (but you will need to have or associate one instance per call so it works as expected):
Copy code
fun Route.handleRoot() {    
    get("/") {
        val log = MyStructuredLogger(application.log)
        log.attach("req.Id", UUID.randomUUID().toString()) {
            <http://log.info|log.info>("Inside /")
            call.respondText("Hello World!")
        }
    }
}

class MyStructuredLogger(val logger: Logger) {
    val attributes = LinkedHashMap<String, String>()
    
    inline fun attach(key: String, value: String, callback: () -> Unit) {
        attributes[key] = value
        try {
            callback()
        } finally {
            attributes.remove(key)
        }
    }
    
    fun info(text: String) {
        <http://logger.info|logger.info>(text, attributes)
    }
}
Or doing this automatically:
Copy code
fun Route.handleRoot() {
    get("/") {
        log.attach("req.Id", UUID.randomUUID().toString()) {
            <http://log.info|log.info>("Inside /")
            call.respondText("Hello World!")
        }
    }
}

val StructuredLoggerAttr = AttributeKey<StructuredLogger>("StructuredLogger")
val PipelineContext<Unit, ApplicationCall>.log get() = this.call.attributes.computeIfAbsent(StructuredLoggerAttr) { StructuredLogger(this.application.log) }

class StructuredLogger(val logger: Logger) {
    val attributes = LinkedHashMap<String, String>()

    inline fun attach(key: String, value: String, callback: () -> Unit) {
        attributes[key] = value
        try {
            callback()
        } finally {
            attributes.remove(key)
        }
    }

    fun info(text: String) {
        <http://logger.info|logger.info>(text, attributes)
    }
}
k
The first approach looks pretty appealing. Forgive my ignorance, but what is the lifecycle of the AttributeKey? i.e. Is it scoped to the request?
d
There is one application call per request/response
Attributes is a lightweight typed injection system
AttributeKey is the key containing the type of the instance to attach
so you are attaching one logger per call/request/response
and it will be collected once the call is done
k
Yeah, this is perfect then.
Very cool, thanks a bunch for the pointers.
d
👍
k
Pretty slick. This is the working sample from your last suggestion.
Copy code
import io.ktor.application.ApplicationCall
import io.ktor.application.application
import io.ktor.application.call
import io.ktor.application.log
import io.ktor.pipeline.PipelineContext
import io.ktor.util.AttributeKey
import net.logstash.logback.marker.Markers.appendEntries
import org.slf4j.Logger

val StructuredLoggerAttr = AttributeKey<StructuredLogger>("StructuredLogger")
val PipelineContext<Unit, ApplicationCall>.log get() = this.call.attributes.computeIfAbsent(StructuredLoggerAttr) { StructuredLogger(this.application.log) }

class StructuredLogger(val logger: Logger) {
    val attributes = LinkedHashMap<String, String>()

    inline fun attach(key: String, value: String, callback: () -> Unit) {
        attributes[key] = value
        try {
            callback()
        } finally {
            attributes.remove(key)
        }
    }

    fun info(text: String) {
        <http://logger.info|logger.info>(appendEntries(attributes), text)
    }
}
🎊 1
d
I would like to add this as a sample to the samples repo and documentation if you don’t main. Since I’m not too familiar with this right now. In addition to add the dependency
compile 'net.logstash.logback:logstash-logback-encoder:5.1'
, do you know how to configure it to log the JSON to the stdout to illustrate how does it work?
k
Copy code
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="net.logstash.logback.encoder.LogstashEncoder" />
    </appender>

    <root level="trace">
        <appender-ref ref="STDOUT"/>
    </root>

    <logger name="org.eclipse.jetty" level="INFO"/>
    <logger name="io.netty" level="INFO"/>
</configuration>
d
Nice. Thanks! 👍
k
I created a gist here for the full logger implementation.
feel free to steal it if you want
was going to mention you in it anyways
d
no need for that, but what I can do is to create the structure here https://github.com/ktorio/ktor-samples and you make a PR adding that code, so you get your credit too while other people can use the code
k
Sure, can do.
👍 1
k
I’ll probably drop offline for a while for some father’s day time with my kids.
👍 1