Thread
#reaktive
    trashcoder

    trashcoder

    6 months ago
    hi, i am new to reaktive and Rx in general. i try to listen to server-sent events (with ktor cio) and i would like to know if any of the existing ways to create observables might fit my needs. i thought i could use the reaktive coroutine functions but since there was only
    maybe
    ,
    single
    and
    completable
    , i ended up creating my own observable. it seems to work but i would like to know, if this is the way that is meant to be used. especially in combination with coroutines. (see thread for code)
    observable<String> { emitter ->
        val executor = ioScheduler.newExecutor()
        emitter.setDisposable(executor)
        executor.submit {
            GlobalScope.launch {
                httpStatement.execute { httpResponse ->
                    val channel: ByteReadChannel = httpResponse.body()
                    while (!channel.isClosedForRead) {
                        channel.read {
                            emitter.onNext(it.decodeString())
                        }
                    }
                }
            }
        }
    }.subscribe {
        println(it)
    }
    Arkadii Ivanov

    Arkadii Ivanov

    6 months ago
    Hello! First of all, it seems that you are just executing a one time HTTP request, not listening to the server. As far as I understand the code, it just reads arbitrary bytes from the connection, encodes bytes into a string and sends to the downstream. So either you need to long-poll the server - execute a request, wait for the response, read the response in full and then execute the connection again. Or you need something like Socket or WebSocket, where you can listen for packets from the server. Regarding the question how to create an
    Observable
    from coroutine, you can try the code below:
    callbackFlow {
            while (isActive) {
                delay(1000L) // Read data
                send("abc") // Send data
            }
        }.asObservable() // Convert Flow to Observable
    trashcoder

    trashcoder

    6 months ago
    Thx, I will try out and play arround with the conversion into an
    Observable
    now. As for the request: I use KTOR 2.0.0 in my project as HTTP client. Now i need to read Server-Sent Events. I googled "KTOR SSE" and found this issue which then lead me here. So I copied this code and modified it a bit to make it work (at least I thought so ^^). I mean, I can use my app normally (the UI is responsive and I can execute other actions on IO) and everytime the server sends an event (caused by another app) it appears in the console. So I do a one time request and get all the events. Isn't that "listening to the server"? This is how I create the httpStatement:
    val httpStatement = localClient.prepareGet("https://${connectionInfo.bridgeIP}/eventstream/clip/v2") {
        applicationKeyHeader()
        header("Accept", "text/event-stream")
    }
    Arkadii Ivanov

    Arkadii Ivanov

    6 months ago
    I have never seen such a usage of Get requests. Usually Get requests are one-time requests. So you connect to the server and then it responds with data. You read the data fully and then use it. Listening with get requests is also possible, but this is usually done via long polling - you connect to the server and wait for the response. Then you again read it fully, close the request and execute a new one. So one command per request. In your example, you are reading bytes from the channel. I'm not sure how it works under the hood, but I would check if there is a guarantee that every read from the channel is complete and contains the data fully. I mean, the command's body may be big and perhaps split by multiple packets.
    trashcoder

    trashcoder

    6 months ago
    I just checked the example CURL call provided by the servers manufacturer in the documentation about "how to receive events". And guess what?! You are right! There is no GET there:
    curl --insecure -N -H 'hue-application-key: <appkey>' -H 'Accept: text/event-stream' https://<ipaddress>/eventstream/clip/v2
    So maybe i should really rethink this approach... 🙈
    Arkadii Ivanov

    Arkadii Ivanov

    6 months ago
    Hmm, looks like this is actually a documented thing. You have just to supply the
    text/event-stream
    header and the server will stream events back. https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
    So yeah, sorry for the confusion.
    trashcoder

    trashcoder

    6 months ago
    No worries. I have a constant level of confusion. It is all part of the learning process 😉