gotoOla
12/21/2018, 12:12 PMimport our.company.toJsonString
import our.company.ApplicationMetrics
import io.ktor.client.HttpClient
import io.ktor.client.engine.apache.Apache
import io.ktor.client.engine.config
import <http://io.ktor.client.request.post|io.ktor.client.request.post>
import io.ktor.client.response.HttpResponse
import org.slf4j.LoggerFactory
class UpdateClient(
baseUrl: String
) {
private val logger = LoggerFactory.getLogger(UpdateClient::class.java)
private val client = HttpClient(Apache.config {
socketTimeout = 30_000
connectTimeout = 30_000
})
private val updateUrl = "$baseUrl/shared"
suspend fun postUpdates(updates: List<Update>) {
try {
val payload = updates.toJsonString()
<http://client.post|client.post><HttpResponse>(updateUrl) {
body = payload
}
ApplicationMetrics.updatesSent.labels("true").inc(updates.size.toDouble())
} catch (e: Exception) {
logger.error("Could not post updates", e)
ApplicationMetrics.updatesSent.labels("false").inc(updates.size.toDouble())
}
}
}
The service that calls this looks something like this
fun businessLogic(urls: List<Url>) {
runBlocking {
val urlsInDynamo = urls.map { url ->
GlobalScope.async {
try {
val result = dynamoClient.increaseSharesAsync(url)
Update(result.attributes()["aString"]?.s()!!, result.attributes()["aValue"]?.n()!!.toLong())
} catch (e: Exception) {
logger.warn("Unable to write get result from the update to dynamo", e)
null
}
}
}.awaitAll().filterNotNull()
if (urlsInDynamo.isEmpty()) {
throw RuntimeException("Non of the urls were updated in Dynamo")
}
// Fire and forget
GlobalScope.launch {
val updates = urlsInDynamo
.groupBy { it.aString }
.map {
it.value.maxBy {
it.value
}!!
}
.toList()
// This is where we call the update client at about 600 rps
updateClient.postUpdates(updates)
}
}
}
And the data class and extension we use
fun Any.toJsonString(pretty: Boolean = false): String {
return DefaultObjectMapper
.let { if (pretty) it.writer(SerializationFeature.INDENT_OUTPUT) else it.writer() }
.writeValueAsString(this)
}
data class Update(val aString: String, val aValue: Long)
gotoOla
12/21/2018, 12:14 PMgotoOla
12/21/2018, 12:14 PMgotoOla
12/21/2018, 12:15 PMe5l
12/21/2018, 12:21 PMHttpResponse
here :`client.post<HttpResponse>(updateUrl)`gotoOla
12/21/2018, 12:21 PMgotoOla
12/21/2018, 12:21 PM<http://client.post|client.post><HttpResponse>(updateUrl) {
body = payload
}.close()
?e5l
12/21/2018, 12:22 PMgotoOla
12/21/2018, 12:22 PMgotoOla
12/21/2018, 12:25 PMgotoOla
12/21/2018, 12:26 PMe5l
12/21/2018, 12:26 PMgotoOla
12/21/2018, 12:29 PMgotoOla
12/21/2018, 12:30 PMe5l
12/21/2018, 12:32 PMApache
requires to release response explicitly(for the internal stuff like buffers and pools), and for a stream-bodye5l
12/21/2018, 12:33 PMresponse.body
is stream type: ByteReadChannel
gotoOla
12/21/2018, 12:34 PMgotoOla
12/21/2018, 12:35 PMAlexander Weickmann
06/17/2020, 4:23 PMe5l
06/25/2020, 11:22 AM1.3.2
the HttpResponse
is not closeable anymore, and you shouldn't close itAlexander Weickmann
07/20/2020, 9:01 AM