https://kotlinlang.org logo
#reaktive
Title
# reaktive
t

trashcoder

03/16/2022, 10:50 PM
hi, i am new to reaktive and Rx in general. i try to listen to server-sent events (with ktor cio) and i would like to know if any of the existing ways to create observables might fit my needs. i thought i could use the reaktive coroutine functions but since there was only
maybe
,
single
and
completable
, i ended up creating my own observable. it seems to work but i would like to know, if this is the way that is meant to be used. especially in combination with coroutines. (see thread for code)
Copy code
observable<String> { emitter ->
    val executor = ioScheduler.newExecutor()
    emitter.setDisposable(executor)
    executor.submit {
        GlobalScope.launch {
            httpStatement.execute { httpResponse ->
                val channel: ByteReadChannel = httpResponse.body()
                while (!channel.isClosedForRead) {
                    channel.read {
                        emitter.onNext(it.decodeString())
                    }
                }
            }
        }
    }
}.subscribe {
    println(it)
}
a

Arkadii Ivanov

03/17/2022, 9:51 AM
Hello! First of all, it seems that you are just executing a one time HTTP request, not listening to the server. As far as I understand the code, it just reads arbitrary bytes from the connection, encodes bytes into a string and sends to the downstream. So either you need to long-poll the server - execute a request, wait for the response, read the response in full and then execute the connection again. Or you need something like Socket or WebSocket, where you can listen for packets from the server. Regarding the question how to create an
Observable
from coroutine, you can try the code below:
Copy code
callbackFlow {
        while (isActive) {
            delay(1000L) // Read data
            send("abc") // Send data
        }
    }.asObservable() // Convert Flow to Observable
t

trashcoder

03/17/2022, 7:16 PM
Thx, I will try out and play arround with the conversion into an
Observable
now. As for the request: I use KTOR 2.0.0 in my project as HTTP client. Now i need to read Server-Sent Events. I googled "KTOR SSE" and found this issue which then lead me here. So I copied this code and modified it a bit to make it work (at least I thought so ^^). I mean, I can use my app normally (the UI is responsive and I can execute other actions on IO) and everytime the server sends an event (caused by another app) it appears in the console. So I do a one time request and get all the events. Isn't that "listening to the server"? This is how I create the `httpStatement`:
Copy code
val httpStatement = localClient.prepareGet("https://${connectionInfo.bridgeIP}/eventstream/clip/v2") {
    applicationKeyHeader()
    header("Accept", "text/event-stream")
}
a

Arkadii Ivanov

03/17/2022, 7:25 PM
I have never seen such a usage of Get requests. Usually Get requests are one-time requests. So you connect to the server and then it responds with data. You read the data fully and then use it. Listening with get requests is also possible, but this is usually done via long polling - you connect to the server and wait for the response. Then you again read it fully, close the request and execute a new one. So one command per request. In your example, you are reading bytes from the channel. I'm not sure how it works under the hood, but I would check if there is a guarantee that every read from the channel is complete and contains the data fully. I mean, the command's body may be big and perhaps split by multiple packets.
t

trashcoder

03/17/2022, 7:38 PM
I just checked the example CURL call provided by the servers manufacturer in the documentation about "how to receive events". And guess what?! You are right! There is no GET there:
Copy code
curl --insecure -N -H 'hue-application-key: <appkey>' -H 'Accept: text/event-stream' https://<ipaddress>/eventstream/clip/v2
So maybe i should really rethink this approach... 🙈
a

Arkadii Ivanov

03/17/2022, 8:15 PM
Hmm, looks like this is actually a documented thing. You have just to supply the
text/event-stream
header and the server will stream events back. https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
So yeah, sorry for the confusion.
t

trashcoder

03/17/2022, 8:39 PM
No worries. I have a constant level of confusion. It is all part of the learning process 😉
👍 1
6 Views