Nikky
11/01/2018, 4:47 PM@ExperimentalCoroutinesApi
private fun ProducerScope<ApiMessage>.deserializer() = object : ResponseDeserializable<Unit> {
override fun deserialize(reader: Reader) {
<http://logger.info|logger.info>("connection open")
reader.forEachLine { line ->
launch {
val msg = ApiMessage.decode(line)
if (msg.event != "api_connect") {
send(msg)
}
}
}
}
}
private fun CoroutineScope.messageBroadcast() = broadcast<ApiMessage>(context = <http://Dispatchers.IO|Dispatchers.IO>) {
while (isActive) {
<http://logger.info|logger.info>("opening connection")
val (_, response, result) = keepOpenManager.request(Method.GET, "$host/api/stream")
.responseObject(deserializer())
when (result) {
is Result.Success -> {
<http://logger.info|logger.info>("connection closed")
}
is Result.Failure -> {
logger.error("connection error")
logger.error("response: $response")
logger.error/*(result.error.exception)*/ {
result.error.localizedMessage
}
// throw result.error.exception
}
}
delay(1000) // reconnect delay
}
}