I am testing Flow with Observables, having this co...
# coroutines
l
I am testing Flow with Observables, having this code:
Copy code
runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
        val myList = mutableListOf(1).asObservable()

        launch {
            myList.asFlow().collect { println("A $it") }
        }

        delay(1000)

        myList += 2

        launch {
            myList.asFlow().collect { println("B $it") }
        }

        delay(1000)
    }
Which produces:
Copy code
A 1
B 1
B 2
Is there some sort of non-terminal
collect
yet that would listen for further changes, so that I would see
A 2
in the output when
myList
gets updated?
z
This isn’t an RxJava or Flow issue, you’re asking for a version of
MutableList
that sends change notifications. One thing you could do is use a
BehaviorSubject<List<Int>>
and push new lists in when you add/remove/change items.
l
Well, I thought that
.asObservable()
will do exactly this. Or at least for JavaFx components it works as expected - they update automatically when the list changes (if they are bound to it).
z
Ah, I assumed
asObservable
turned a list of
[1]
into an
Observable
that just emitted
1
and then completed. Where does
asObservable
come from?
l
I think it comes with the TornadoFX lib 😊
z
ah, not familiar with that one. sorry for the confusion!
I read the
ObservableList
docs and I think I understand what you meant now. It seems like the simplest thing to do would be to create a
Flow<ListChangeListener.Change<T>>
and then handle changes manually. I'm not sure how you would convert that into a simple flow of elements though - you can emit added elements, but how would you handle changed or removed elements? E.g.
Copy code
val changes = flowViaChannel(bufferSize = UNLIMITED) { ch ->
  val listener = ListChangeListener {
    ch.offer(it)
  }
  myList.addListener(listener)
  ch.invokeOnClose { 
    myList.removeListener(listener)
  }
}
l
Hmh, interesting question about the removing😁