Nikky
11/01/2018, 2:29 AMNikky
11/01/2018, 2:34 AMNikky
11/01/2018, 2:35 AMprivate fun ProducerScope<ApiMessage>.deserializer() = object: ResponseDeserializable<Job> {
override fun deserialize(inputStream: InputStream) : Job {
return launch(<http://Dispatchers.IO|Dispatchers.IO>) {
inputStream.use { input ->
<http://logger.info|logger.info>("connection opened")
val buffer = StringBuilder()
while (isActive) {
val buf = ByteArray(1024)
Thread.sleep(10)
while (input.available() <= 0) {
if (isClosedForSend) break
Thread.sleep(10)
}
val chars = input.read(buf)
StreamConnection.logger.trace(String.format("read %d chars", chars))
if (chars > 0) {
val added = String(Arrays.copyOfRange(buf, 0, chars))
logger.debug("json: $added")
buffer.append(added)
while (buffer.toString().contains("\n")) {
val index = buffer.indexOf("\n")
val line = buffer.substring(0, index)
buffer.delete(0, index + 1)
val msg = ApiMessage.decode(line)
send(msg)
}
} else if (chars < 0) {
break
}
}
}
}
}
}
Nikky
11/01/2018, 2:35 AMNikky
11/01/2018, 2:43 AMoverride fun deserialize(reader: Reader) {
reader.forEachLine { line ->
val msg = ApiMessage.decode(line)
launch {
send(msg)
}
}
}
Derk-Jan Karrenbeld
11/01/2018, 1:38 PMNikky
11/01/2018, 1:51 PMNikky
11/01/2018, 1:52 PMNikky
11/01/2018, 1:55 PMDerk-Jan Karrenbeld
11/01/2018, 2:03 PMDerk-Jan Karrenbeld
11/01/2018, 2:03 PMDerk-Jan Karrenbeld
11/01/2018, 2:03 PMNikky
11/01/2018, 2:08 PMNikky
11/01/2018, 2:09 PMDerk-Jan Karrenbeld
11/01/2018, 2:47 PMNikky
11/01/2018, 3:30 PMDerk-Jan Karrenbeld
11/01/2018, 3:33 PMfinishesso it is not closed when the coroutine starts?
Derk-Jan Karrenbeld
11/01/2018, 4:44 PMDerk-Jan Karrenbeld
11/01/2018, 4:44 PMoverride fun deserialize(response: Response): T {
Derk-Jan Karrenbeld
11/01/2018, 4:44 PMNikky
11/01/2018, 4:45 PMDerk-Jan Karrenbeld
11/01/2018, 4:45 PMresponse.body.toStream().use { stream ->
Derk-Jan Karrenbeld
11/01/2018, 4:45 PMDerk-Jan Karrenbeld
11/01/2018, 4:46 PMNikky
11/01/2018, 4:47 PM@ExperimentalCoroutinesApi
private fun ProducerScope<ApiMessage>.deserializer() = object : ResponseDeserializable<Unit> {
override fun deserialize(reader: Reader) {
<http://logger.info|logger.info>("connection open")
reader.forEachLine { line ->
launch {
val msg = ApiMessage.decode(line)
if (msg.event != "api_connect") {
send(msg)
}
}
}
}
}
private fun CoroutineScope.messageBroadcast() = broadcast<ApiMessage>(context = <http://Dispatchers.IO|Dispatchers.IO>) {
while (isActive) {
<http://logger.info|logger.info>("opening connection")
val (_, response, result) = keepOpenManager.request(Method.GET, "$host/api/stream")
.responseObject(deserializer())
when (result) {
is Result.Success -> {
<http://logger.info|logger.info>("connection closed")
}
is Result.Failure -> {
logger.error("connection error")
logger.error("response: $response")
logger.error/*(result.error.exception)*/ {
result.error.localizedMessage
}
// throw result.error.exception
}
}
delay(1000) // reconnect delay
}
}
Nikky
11/01/2018, 4:47 PMDerk-Jan Karrenbeld
11/01/2018, 4:48 PMoverride fun deserialize(reader: Reader) {
Derk-Jan Karrenbeld
11/01/2018, 4:48 PMDerk-Jan Karrenbeld
11/01/2018, 4:48 PMoverride fun deserialize(response: Response): T {
Nikky
11/01/2018, 4:56 PM