https://kotlinlang.org logo
Title
a

Arkadii Ivanov

02/03/2020, 11:05 AM
Hello guys. I started implementing an interop module for Reaktive. Checked existing implementations for RxJava2 and Reactor. The CororutineContext->Scheduler converter appears to be implemented incorrectly. E.g. "tasks scheduled on a Worker are guaranteed to be executed sequentially and in a non-overlapping fashion", but here they are being executed in parallel: https://github.com/arrow-kt/arrow/blob/397b84f1366662c8a0f4c3ea02717e1c74ccc990/modules/fx/arrow-fx-rx2/src/main/kotlin/arrow/fx/rx2/CoroutineContextRx2Scheduler.kt#L29. And I believe same issue in Reactor module. So I'm wondering is there any reason of such an implementation or it's just a bug?
s

simon.vergauwen

02/03/2020, 11:08 AM
Hi @Arkadii Ivanov, That’s awesome. I’d love to pick your brain about MPP for Arrow Fx 😅. That’s probably a bug in that case, the RxJava & Reactor implementations were done on a best-effort basis by me & @pakoito. If you see any other room for improvement that’d be awesome!
❤️ 1
Reactor is also still missing
Concurrent
implementation for all its types.
Bracket
which composes 3 of the same Rx types together while guaranteeing resource-safety was especially hard. I was surprised neither frameworks exposes such an operation, they only support side-effecting onces.
Looking very much forward to your contribution! Reaktive is awesome!
a

Arkadii Ivanov

02/03/2020, 11:11 AM
Cool! On the other hand I'm completely new to Arrow lib
Will doo all my best
s

simon.vergauwen

02/03/2020, 11:11 AM
If you have any questions I’d be happy to answer all of them or jump on a call if needed 👍
👍 1
a

Arkadii Ivanov

02/03/2020, 11:12 AM
I think for correct implementation a dependency to kotlinx.coroutines would be required
s

simon.vergauwen

02/03/2020, 11:17 AM
Why would that be required?
In that case I personally would drop it from the module instead of introducing a dependency for a single combinator
a

Arkadii Ivanov

02/03/2020, 11:17 AM
From first point of view Channels of Flow will be required
So you need to start a coroutine and receive tasks one by one for execution. I will think more later, maybe there is a better solution
s

simon.vergauwen

02/03/2020, 11:19 AM
We have
Queue
etc in Arrow Fx as well which is the FP equivalent of
Channel
a

Arkadii Ivanov

02/03/2020, 11:19 AM
Also delayed tasks should be prioritized so there should be kinda PriorityQueue
We have Queue etc in Arrow Fx as well which is the FP equivalent of Channel
s

simon.vergauwen

02/03/2020, 11:20 AM
Yes, sounds we should drop it for now if it involves that. Explains why
Schedulers.computation()
was acting weird but
<http://Schedulers.io|Schedulers.io>()
wasn’t
a

Arkadii Ivanov

02/03/2020, 11:20 AM
Corotuine should suspend waiting for a task
s

simon.vergauwen

02/03/2020, 11:20 AM
Which is true for
Queue
in Arrow Fx. It semantically blocks, or suspends.
a

Arkadii Ivanov

02/03/2020, 11:20 AM
If drop then how should I implement
continueOn
?
s

simon.vergauwen

02/03/2020, 11:21 AM
continueOn
is simply
subscribeOn
observeOn
a

Arkadii Ivanov

02/03/2020, 11:22 AM
continueOn
accepts
CoroutineContext
So we need interop
s

simon.vergauwen

02/03/2020, 11:22 AM
It changes the threading for downstream, which is
observeOn
and you can install multiple times
Right, we do need it
👍 1
a

Arkadii Ivanov

02/03/2020, 11:23 AM
E.g. we have a
Single
wrapped into
SingleK
. The last one has
continueOn
operator. I need to implement this operator.
Cool, thanks guys for your support. I'm offered 20% of my work hours to spend on open source. Plus a bit of spare time.
Will take a while)
s

simon.vergauwen

02/03/2020, 11:26 AM
Cool, will help wherever I can. I was just thinking, creating a
Scheduler
from a
CoroutineContext
shouldn’t be any different from creating a scheduler from an
ExecutorService
, isn’t it?
They also have the same priority and synchronisation problem.
a

Arkadii Ivanov

02/03/2020, 11:34 AM
Yup same problems in general. That's why RxJava2's
ExecutorScheduler
implementation is ~450 lines long. And it uses tricky queue under the hood.