elizarov
12/21/2016, 5:29 PMasyncRxGenerate
to create cold observables (new coroutine for each subscriber). So now you can trivially implement any complex observable transformation with a purely imperative code and it will work with all the proper back-pressure support, for example:
fun square(observable: Observable<Int>) = asyncRxGenerate {
for (x in observable)
emit(x * x)
}
while this case, in particular, is handled by Observable.map
, you can have as complex logic as you want, without wrapping your head on how to express it with combinators. You can also invoke other arbitrary suspending functions from inside of asyncRxGenerate
(await for some future, sleep, etc)