Arent coroutines experimental? Does anyone use the...
# vertx
d
Arent coroutines experimental? Does anyone use them in production without problems, im also making an api in vertx, but im currently using rxjava...
d
Thanks! Do you know if/how they can be used as rxjava scheduler as well? Or will I have to port rxjava code into coroutines in my vertx api...
e
I am not sure I understand what do you mean by “using them as rxjava scheduler”
d
subscribeOn(RxKotlin.coroutineIO())
Subscibes on the context of a coroutine instead of a thread in the io pool... or am i completely misunderstanding the inner workings of coroutines and rxjava?
n
Didn't Roman mention on Slack a while back that Coroutines wouldn't be stabilised in Kotlin 1.2? How does that affect the Vert.x 3.5 plans?
e
They will not be broken in 1.2 either.
😕 1
d
@elizarov I was looking at the kotlinx-coroutines-rx1, it's SO different from regular rx programming, it's not just a drop-in replacement for someone with a large rx codebase. There really is no way to implement it as an rx scheduler... or am I really asking a ridiculous nube question?🤔
n
Rx isn't a silver bullet for solving concurrency problems and is a Kitchen Sink solution that is overkill when only used to handle concurrency, hence multiple high level concurrency models (eg Actor, CSP, Async/Await) exist with their own advantages/drawbacks that ONLY cover concurrency. When it comes to the Unix design philosophy (Do one thing and do it well - https://en.wikipedia.org/wiki/Unix_philosophy#Do_One_Thing_and_Do_It_Well ) Rx is the direct opposite of that (isn't designed well).
e
kotlinx-coroutines-rx1
was not meant as a replacement for Rx at all. It augments Rx with ability to iterate over reactive streams from coroutines, which enables you to encode complex business logic easily and directly.
d
@napperley The main reason that I use RxJava so much is because it does a great job in seperating different layers of application logic (like in clean architechture and MVP), and I get the ability to add concurrency at any step of the process when needed without having to refactor everything...
@elizarov I didn't mean that
kotlinx-coroutines-rx1
should replace rx, but rather leverage the existing rxjava Schedulers model that along with the possibility to use thread pools when needed, it should be able to 'schedule' operations to coroutines... I don't know if that's possible...
e
But you already can. There is an extension function
fun Scheduler.asCoroutineDispatcher()
that converts Rx scheduler into coroutine dispatcher.
So, you can use your existing Rx schedulers to execute your coroutines.
d
I'm not sure I understand, I have to implement coroutines and feed them to rx operators and use the asCoroutineDispatcher(), or can I just use the scheduler, and it'll take care of putting the operators into the context of coroutines?
The first possibility involves code refactoring, the second enables to switch between the threading model or coroutine model as needed...
e
Sorry, but I fail to grasp your question. You don’t have to do anything. You use Rx when it works for you. But if you need to write some complex logic that becomes too hard to express in Rx, you can simply use interop operators from
kotlinx-coroutines-rx
and write this code in a simpler way, while still using all your Rx schedulers.
Coroutines are just a tool for easy writing your async code. There is no magic. You can write all of that w/o coroutines by hand. However, many cases it involves writing a lot of spagetti code with callbacks and closures. Coroutines just let you avoid all of them, and let compiler take care
d
So I can do:
Copy code
service.getUsers()
          .map(::mapJsonToUser)
          .subscribeOn(Scheduler.asCoroutineDispatcher())
          .subscribe(subscriber)
And not have to write ::mapJsonToUser as suspend fun nor have getUsers be suspend fun, and the coroutines-rx Scheduler will automatically run them as coroutines?
BTW @elizarov, I appreciate the clarifications, I'm hopefully starting to understand things... but this clarification is important, because I need the flexibility to easily switch implementation from threads to coroutines w/o refactoring the whole codebase... You know, bosses are always going to stay IMPATIENT ! 😁
e
You cannot
subscribeOn(coroutineDispatcher)
. You subscribe on Rx scheduler.
Why would you want to subscribe on coroutine dispatcher?
d
To keep the current codebase, just changing the scheduler instead of refactoring everything
But while avoiding starting threads for things that can be done w/ coroutines...
e
I think you totally misunderstand what coroutines are supposed to do. I suggest to go to basic first. Have you read “The Guide to kotlinx.coroutines”? You can also watch my GeekOut presentation
You cannot “just change scheduler”. To use coroutine you have to have some “suspending functions”. Without suspending functions it is all useless.
d
Yes, I did read the docs on the basics, I just don't know if there is a SIMPLE way they can integrate into existing RX codebase. So the ::mapJsonToUser in the example above would have to have
suspend fun mapJsonToUser(..):..
, then how would I use
asCoroutineDispatcher
?
I'm going through your https://www.slideshare.net/elizarov/introduction-to-kotlin-coroutines. It's great! I see that coroutines are a simpler alternative to Rx. But the stdlib map function is not suspended after getting a result from the api, so even if in lauch() it'll still be blocking? Unless the lambda passed to map is a
suspend fun
?
e
For example, you have a
fun foo(): Observable<T>
that returns a Ts and
suspend fun bar(item: T)
that does some complex async stuff on T and you want to invoke
bar
on each item from
foo
, so you do
launch(context) { foo().consumeEach { bar(it) } }
and it just works.
If you pass suspend fun to
map
, then the map itself will suspend.
d
So that's the right way to do it if map has to do some io for each iteration element?
launch(context) { foo().consumeEach { bar(it) } }
actually subscribes to the observable?
e
Yes.
consumeEach
does
It subscribes, processes all items, and closes subscription. I mean, you can checkout its docs (and there is also link to source): https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/consume-each.html
Oops… source link is broken…
d
Thanks! Now I got what this lib does! It basically replaces the subscribe callback with coroutines!
e
Yes
The key here is that you a no longer limited by simple transformation pipelines (map, flatMap, etc). You can do any complex logic — branching, loops, etc.
Easily, that is.
d
But the actual observable chain still runs in whatever threads it was subscribed on.
e
But if all you do are simple pipelines, then it will not get any easier or better than Rx
The observable runs whenever it is configured to run
However, you choose where you want your coroutine be executed by passing
context
d
What do you mean by complex logic? In the coroutine 'subscriber'?
e
For example, if you want to subscribe to observable and update your UI, you just do
launch(UI) { myObservable.consumeEach { updateUI(it) } }
Inside consumeEach. You can do, for example
if (condition) doOneAsyncAction() else doOtherAsyncAction()
. This kind of logic is non-trivial to express with Rx combinators
d
Oh, but that's the downside here. If the actual
myObservable
is doing some kind of network request, it can't be run as suspendable, it has to run on an rx scheduler.. just the result gets run as a coroutine...
Or the
myObservable
has to be rewritten as a
suspend fun
, and then ditch rx...
Now I know why @jw was talking about rewriting rx for kotlin... like http://akarnokd.blogspot.co.il/2017/09/rewriting-rxjava-with-kotlin-coroutines.html
e
You don't have to rewrite. That is what integration libs are for. You can covert back and forth as needed.
No problem with your observable doing network request.
No need to rewrite. Just use it.
d
Even with no subscribeOn? It's not suspendable
e
It is async. That's enough. Integrations libs are converting it into suspending.
That is why we provide all those integration modules. So that you can convert your other async stuff into supendable w/o rewriting it.
d
So it's enough to run everything in launch() even with no suspend functions?
e
Of course not. You use suspending funs from integration libs. Like
comsumeEach
. It provides conversion.
d
Or is it because i'm subscribing using a suspendable function that the network request automatically is suspendable
e
Converts you lot observable into suspending fun.
Yes.
d
It's like providing a callback to the network function?
That runs when the result arrives
e
Not automagically. There is complex code behind consumeEach doing it, which you don't have to write yourself.
d
So the whole observable runs on the CommonPool when using consumeEach?
e
You coroutine will suspend until network req arrives and then resume with result.
Observable runs wherever you configure it to run via Rx operators.
d
If nothing configured, it defaults to CommonPool?
e
It depends on your code.
There is not default in Rx. Default is operator-dependent.
So it depends on your code/libs. Like, Retrofit request will run in its OkHttp IO threads by default. It all depends.
d
Default is usually current thread unless doing timer or interval operator..
e
IO does not run from current thread by default. Only simple stuff like number sequences does.
d
But in vert.x I think there is no default with the common sql lib
?
e
I have not idea. It all depends in Rx.
Check it's source/docs to know.
Does not matter with coroutines anyway. They run whereever you tell them to run.
d
The current thread within a launch() or async() is what?
If not suspending function.
e
I cannot retell you the guide in this thread it is like in first or second example in the "Context" section.
d
Ok, thanks alot for all the help! 😃
e
I would have sent you a specific link, but I am on the read
d
Ok, if/when you have a chance, thanks in advance!! That's the last part I'm less clear on, and the docs in Kotlin site don't seem to say... I still go through all your slide presentations, they're an eye opener! 🙂