nhaarman
09/13/2017, 8:39 AMx
values emitted by bar
are stored in some state, and foo
emits y
values for each value bar
emits based on this state. My solution was to call emitter.onNext
y
times, and store the state inside the create
block.
Now I realize that a concatMap
like this does the same:
// receiver is `bar`
fun Observable<Int>.foo() : Observable<Int> {
var state = emptyState()
return concatMap { value ->
val values : List<Int> = calculateValues(value, state)
state = state.updateWith(value)
Observable.fromIterable(value)
}
}
This of course keeps a single stream including error and terminal messages 🙂