is anyone out there using the jdk9+ <java.net>.htt...
# server
r
is anyone out there using the jdk9+ java.net.http.HttpClient with sendAsync in combination with CompletionStage<T>.await() to make it suspendable? https://www.javaspecialists.eu/archive/Issue271.html got me thinking on the scalability of this solution… parts of the http requests are now executed on the common forkjoinpool threads and parts of it are awaited by coroutine executors.. 🤔
a
I’ve certainly used it (and adapted it so that I receive a Flow of ByteBuffers, for example). I didn’t sweat too much about the threading, though: I figured it was largely I/O bound except the decompress (where I had control, if I wanted it)
r
I have code like
Copy code
client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
                .thenApply { it.body() }
                .thenApply { deserialize(it) }
                .await()
Meaning that reading the body and deserializing (using jackson objectmapper) is done in java world on the executor used by HttpClient, and then there is a switch to a coroutine
Question is if its better to move the deserialization etc off the executor for httpclient
care to share your bytebuffer subscriber solution @araqnid
a
My quick reaction is that it makes sense to take deserialization off the I/O thread pool, either the Java way (thenApplyAsync) or the Kotlin way (await().let { deserialize(it) } or similar)
r
I was just trying out something like
Copy code
class JsonBodyHandler<W>(private val wClass: Class<W>) : BodyHandler<Supplier<W>> {
    override fun apply(responseInfo: ResponseInfo): BodySubscriber<Supplier<W>> {
        return asJSON(wClass)
    }
}

fun <W> asJSON(targetType: Class<W>?): BodySubscriber<Supplier<W>> {
    val upstream = BodySubscribers.ofInputStream()
    return BodySubscribers.mapping(upstream) { inputStream: InputStream -> toSupplierOfType(inputStream, targetType) }
}

private fun <W> toSupplierOfType(inputStream: InputStream, targetType: Class<W>?): Supplier<W> {
    return Supplier {
        try {
            inputStream.use { stream ->
                val objectMapper = createObjectMapper()
                objectMapper.readValue(stream, targetType)
            }
        } catch (e: IOException) {
            throw UncheckedIOException(e)
        }
    }
}
adapted from https://stackoverflow.com/questions/57629401/deserializing-json-using-java-11-httpclient-and-custom-bodyhandler-with-jackson
a
sure, although there’s less to it since they released a jdk9 integration
The reality is that Jackson deserialization needs to block as it’s based on InputStream/Reader, so either buffer everything up or do something more complicated to allow it to run in parallel with the I/O
Since you’re already pulling the body into a string, there’s probably nothing to worry about there- it’s all buffered up
r
I was thinking of doing something better than creating a lot of strings
a
Well, you could write a ByteBuffersInputStream that will accept ByteBuffer instances (and an end-of-input signal) arriving from the Flow and expose that as an InputStream for Jackson to consume (although it would have to block when none were available). That would cut out the buffering.
I didn’t do it, I just aggregated all the buffers together into one big one and passed that to Jackson: https://github.com/araqnid/library-versions/blob/master/src/main/kotlin/org/araqnid/libraryversions/NodeJsResolver.kt
r
Yeah, I wanted to build a solution that does not block the httpclient executor threads.
… nor doing blocking calls in the coroutine scope
a
imho using a StringReader/ByteArrayInputStream isn’t really “blocking” in that it’s not going to wait for I/O .. although that’s something only the developer knows, not the tools that just see Reader/InputStream usage.
r
trying to understand on which threadpool the BodySubscriber<Supplier<W>> solution from the SO post linked above executes on. As I understand it on the java forkjoinpool ?
have to debug…
thanks for your code by the way