Hello guys. I started implementing an interop modu...
# arrow-contributors
a
Hello guys. I started implementing an interop module for Reaktive. Checked existing implementations for RxJava2 and Reactor. The CororutineContext->Scheduler converter appears to be implemented incorrectly. E.g. "tasks scheduled on a Worker are guaranteed to be executed sequentially and in a non-overlapping fashion", but here they are being executed in parallel: https://github.com/arrow-kt/arrow/blob/397b84f1366662c8a0f4c3ea02717e1c74ccc990/modules/fx/arrow-fx-rx2/src/main/kotlin/arrow/fx/rx2/CoroutineContextRx2Scheduler.kt#L29. And I believe same issue in Reactor module. So I'm wondering is there any reason of such an implementation or it's just a bug?
s
Hi @Arkadii Ivanov, That’s awesome. I’d love to pick your brain about MPP for Arrow Fx 😅. That’s probably a bug in that case, the RxJava & Reactor implementations were done on a best-effort basis by me & @pakoito. If you see any other room for improvement that’d be awesome!
❤️ 1
Reactor is also still missing
Concurrent
implementation for all its types.
Bracket
which composes 3 of the same Rx types together while guaranteeing resource-safety was especially hard. I was surprised neither frameworks exposes such an operation, they only support side-effecting onces.
Looking very much forward to your contribution! Reaktive is awesome!
a
Cool! On the other hand I'm completely new to Arrow lib
Will doo all my best
s
If you have any questions I’d be happy to answer all of them or jump on a call if needed 👍
👍 1
a
I think for correct implementation a dependency to kotlinx.coroutines would be required
s
Why would that be required?
In that case I personally would drop it from the module instead of introducing a dependency for a single combinator
a
From first point of view Channels of Flow will be required
So you need to start a coroutine and receive tasks one by one for execution. I will think more later, maybe there is a better solution
s
We have
Queue
etc in Arrow Fx as well which is the FP equivalent of
Channel
a
Also delayed tasks should be prioritized so there should be kinda PriorityQueue
Copy code
We have Queue etc in Arrow Fx as well which is the FP equivalent of Channel
s
Yes, sounds we should drop it for now if it involves that. Explains why
Schedulers.computation()
was acting weird but
<http://Schedulers.io|Schedulers.io>()
wasn’t
a
Corotuine should suspend waiting for a task
s
Which is true for
Queue
in Arrow Fx. It semantically blocks, or suspends.
a
If drop then how should I implement
continueOn
?
s
continueOn
is simply
subscribeOn
observeOn
a
continueOn
accepts
CoroutineContext
So we need interop
s
It changes the threading for downstream, which is
observeOn
and you can install multiple times
Right, we do need it
👍 1
a
E.g. we have a
Single
wrapped into
SingleK
. The last one has
continueOn
operator. I need to implement this operator.
Cool, thanks guys for your support. I'm offered 20% of my work hours to spend on open source. Plus a bit of spare time.
Will take a while)
s
Cool, will help wherever I can. I was just thinking, creating a
Scheduler
from a
CoroutineContext
shouldn’t be any different from creating a scheduler from an
ExecutorService
, isn’t it?
They also have the same priority and synchronisation problem.
a
Yup same problems in general. That's why RxJava2's
ExecutorScheduler
implementation is ~450 lines long. And it uses tricky queue under the hood.