I’ve called it `asyncRxGenerate` to create cold ob...
# coroutines
e
I’ve called it
asyncRxGenerate
to create cold observables (new coroutine for each subscriber). So now you can trivially implement any complex observable transformation with a purely imperative code and it will work with all the proper back-pressure support, for example:
Copy code
fun square(observable: Observable<Int>) = asyncRxGenerate {
    for (x in observable)
        emit(x * x)
}
while this case, in particular, is handled by
Observable.map
, you can have as complex logic as you want, without wrapping your head on how to express it with combinators. You can also invoke other arbitrary suspending functions from inside of
asyncRxGenerate
(await for some future, sleep, etc)