# reaktive

Nikola Milovic

09/28/2021, 11:43 AM
I have a ChatClient in my common code ( expect class currently implemented in JS) and am trying to have some kind of method to subscribe to new messages that come from the WebSocket. And to clear the subscription when the subscriber gets cleaned up. (Using stores from MVIKotlin). I am really lost at the moment, I cannot navigate the library to find what I need, this is what I am trying to achieve
//inside jsMain actual ChatClient
    private val observable : Observable<MessageModel> = 

    actual fun subscribeToNewMessages(observer or something to subscribe to the stream of messages){
        observable.subscribe With The Observer
    actual fun somehow Clear the Subscription when Neccessary()

    socket.on("received_message") { mess ->
       observable.somehow Send to observers (mess)
Hopefully this makes sense

Arkadii Ivanov

09/28/2021, 1:00 PM
Hello. Based on what I understand, you can just expose an
from your ChatClient.
val observable: Observable<MessageModel> =
    observable { emitter ->
        socket.on("received_message") { mess ->
        emitter.setCancellable { 
            // Called when the observable is unsubscribed (disposed)
            // Cancel the socket subscription using its API
            // Usually the "on" method should return a cancellation handle
    }.share() // Add this operator to share the subscription between multiple subscribers, if needed