dimsuz
06/18/2020, 10:26 AMval events = Observable.just("event1", "event2", "event3")
val initialState = Single.just(emptyList<String>())
// given a current state produces next state's Single
val reducerFactory = { currentState: List<String>, event: String ->
Single.fromCallable { /* do work */ currentState.plus(event) }
}
events
.scan(
initialState,
{ currentStateSingle, event ->
currentStateSingle
.flatMap { currentState -> reducerFactory( currentState, event) }
// required to avoid resubscription on all previously emitted single's on each new scan iteration
.cache() // (1)
}
)
.flatMapSingle { it }
.subscribe { state -> println("state updated to $state") }
What bothers me is that each event will add a new "flatMap" (marked as (1)
) to an existing chain and it will grow unbounedly consuming memory and never disposing intermediate Singles
while after they emitted a new state they are not needed and can be safely disposed. The only other option I see is to have state
being held atomically outside the rx chain and chainging it from within the operators, but this feels a bit messy.