>requires one to be async with respect to the o...
# rx
a
requires one to be async with respect to the other
Not exactly async… Because of how it works internally (we use it with 1.x, but 2.x looks same in that regard) you will be able to use it even in cases with no hot observables involved
Observable.just(1, 2).current().test().assertValue(2)
. Basically what happens under the hood is that
withLatestFrom()
subscribes to
other
first (this is very important), and only then subscribes to upstream. When upstream emits a value (which we do via
Observable.just(Unit)
so it's synchronous for the subscriber) operator combines upstream value with latest of
other
value. If
other
is able to emit value synchronously (right-away when subscriber subscribes) then operator will have `other`'s value to combine with before even synchronous upstream will emit its item. See
<https://github.com/ReactiveX/RxJava/blob/7c95808f077537428f2ae80fffd15e2848a2de31/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFrom.java#L43>
That allows
current()
operator to work with both cold and hot upstreams correctly.
A single observable cannot emit a latest or current without being hot, simply by definition
I can only agree with this if you're talking about bad operator name we chose, but as pointed above, it works without requiring ustream to be hot.
Or, put another way, normal subscription to the observable is already how you get the latest or current value
This is correct. However there is an important difference between possible implementations of getting latest or current value by "normal subscription". If upstream emits value with some time delay,
current()
will not emit it to you, to achieve such an effect you would basically need to write own implementation of
withLatestFrom()
, otherwise you may end up replicating
take(1)
instead which as pointed few messages above may lead to wrong behavior in your business logic by reacting on event that "unexpectedly" (for your business logic) arrives at some point in future.