https://kotlinlang.org logo
#rx
Title
# rx
d

dimsuz

06/18/2020, 10:26 AM
Hi! I'm excersising in writing Rx-based unidirectional flow where each state reduce is a Single. Usually this is done with scan, but when Single is involved it's a bit tricky. I managed to get it working like this:
Copy code
val events = Observable.just("event1", "event2", "event3")
  val initialState = Single.just(emptyList<String>())
  // given a current state produces next state's Single
  val reducerFactory = { currentState: List<String>, event: String ->
    Single.fromCallable { /* do work */ currentState.plus(event) }
  }
  events
    .scan(
      initialState,
      { currentStateSingle, event ->
        currentStateSingle
          .flatMap { currentState -> reducerFactory( currentState, event) }
          // required to avoid resubscription on all previously emitted single's on each new scan iteration
          .cache() // (1)
      }
    )
    .flatMapSingle { it }
    .subscribe { state -> println("state updated to $state") }
What bothers me is that each event will add a new "flatMap" (marked as
(1)
) to an existing chain and it will grow unbounedly consuming memory and never disposing intermediate
Singles
while after they emitted a new state they are not needed and can be safely disposed. The only other option I see is to have
state
being held atomically outside the rx chain and chainging it from within the operators, but this feels a bit messy.
Ugh, this is quite a post, perhaps I'd better send it to StackOverflow.
3 Views