If I use the `fun CompletionStage<V>.asIO():...
# arrow
p
If I use the
fun CompletionStage<V>.asIO(): IO<V> = IO.async
then I won't be able to have
fun <V> startThings(a: Async<V>) = object : MemSource<V>, Async<V> by a
. What am I missing that connects
IO<F>
to
Async<F>
?
s
Wjy wouldn’t you be able to have both?
It’s
Async<ForIO
and
IO<A>
btw, not sure if that clears anything up.
Async<F>
defines behavior/logic for
F
.
I.e.
Async<ForIO>.async { ... }
==
IO.async { ... }
p
Async<F>
defines behaviour/logic for
F
and
IO.async
defines the
IO<F>
.
Copy code
interface Source<F> : Async<F>
interface Printer<F> : Monad<F>
interface Fetcher<F> : Source<F>, Printer<F>

    fun startThingsForIO(src: Source<ForIO>, a: Async<ForIO>) = object : Async<ForIO> by a
        , Source<ForIO> by src
        , Fetcher<ForIO> {}
For some reason it is complaining that the overrides are all missing
Copy code
Object must override public open val fx: AsyncFx<ForIO> defined in arrow.kt.tests.main.startThingsForIO.<no name provided> because it inherits many implementations of it
e: .../arrow-kt-tests/src/main/kotlin/arrow/kt/tests/Problem.kt: (58, 65): Object must override public open fun <A> Ref(a: A): Kind<ForIO, Ref<ForIO, A>> defined in arrow.kt.tests.main.startThingsForIO.<no name provided> because it inherits many implementations of it
apparently if
Source<F>
extends
Async<F>
and the
object :
also extends
Async<F>
the compiler doesn't know which one to choose.
@simon.vergauwen does
fun startThingsForIO(src: Source<ForIO>) = object : Source<ForIO> by src, Fetcher<ForIO>
make sense? How do I pass it a
Source
for
CompletionStage
for example?
b
there's an arrow page on this, let me dig it up
p
Still having trouble connecting all the dots at creation time:
Copy code
fun startThingsForIO(src: Source<ForIO>) = object : Source<ForIO> by src, Fetcher<ForIO> {
    }
    startThingsForIO(object : MemSource<???> {
        override val db: MutableMap<Int, User> = mutableMapOf()
    })
@pakoito any more insight you can provide? Not seeing where the
CompletionStage<V>.asIO
fits in here.
s
@Pedro Silva Hi, sorry for the later reply. I took a look at your snippets this morning and this is how things could be wired together.
In case I missed something, or if you run into anything else I’d be happy to help you work it out with you🙂
p
Hello @simon.vergauwen. Many thanks for the help. Things are a bit clearer now, there is still things that elude me: 1. Now that we have something that can be started for
ForIO
how do we actually start it? 2. What changes would have to be made to run this with a
ForOption
? 3. Given that source has 1 method that is not a
suspend fun
this will never work with coroutines, so even if I use
CompletionStage
, or
Observable
or any other async abstraction the
suspend
modifier will still have to be present in the interface, correct?
I've updated your snippet with a minimal implementation for each interface
@simon.vergauwen if you'd prefer to talk with direct message instead of this thread, feel free to do so
once again, thank you for your efforts
👍 1
s
1. Now that we have something that can be started for
ForIO
how do we actually start it? Running an
IO
can be done in a couple of different ways but this is ideally done at the end of your program, ideally you would never run the
IO
yourself and instead do
fun main(arg: Array<String>): IO<Unit> = program()
. In the cases where you have to run it yourself because you don’t have
fun main(arg: Array<String>): IO<Unit>
then you should use
unsafeRunAsync
or
unsafeRunCancelable
. There is an example in the snippet attached below. 2. What changes would have to be made to run this with a
ForOption
? You cannot run this program with
Option
since it doesn’t have the power of doing running effects in a controlled environment. This is clear from the instances as
Option
only has instances up to
Monad
, so no instance for
Async
to satisfy the constraints of your program. Option, Either, State, etc are tools to help model certain types of data while
IO
,
RxJava
,
Reactor
etc provide tools for working with side-effects in a controlled way, and by doing so they can offer powerful concurrency tools that help solve real business problems like recovering from errors, concurrency, parallelism, composing different effect-full programs etc. Running this program with
RxJava
,
Reactor
(or any other effect lib) could be done simply by using the
Async
instance for them. I’ve added examples for
RxJava
&
Reactor
below. 3. Given that source has 1 method that is not a
suspend fun
this will never work with coroutines, so even if I use
CompletionStage
, or
Observable
or any other async abstraction the
suspend
modifier will still have to be present in the interface, correct? I am not sure I 100% follow but you can consume
suspend
within
F
by using
Async<F>#effect
which takes a suspend lambda as it’s parameter. You can currently also move from
IO<A>
to
A
using a suspend function
suspend fun IO<A>.suspended(): A
. This function will also make it into the type-class hierarchy soon but is not available there yet but by looking at the implementation of
IO
it should be easy to write the same function for other `F`s.
Happy to help 🙂
p
@simon.vergauwen 1. Much clearer now. Let's say I would like something different that Observable or RxJava, for example, Vertx Futures or CompletionStage (as mentioned above). I would have to implement the
Async<F>
for those types and then run as
startThingsForF(VertxK.async().memSource())
or
startThingsForF(CompletionStageK.async().memSource())
, correct? 2. I suspected that. Thank you for confirming, it makes sense. 3. The goal here was to not have 2 interfaces. The current
Source<T>
interface is compatible with any
IO
(from Arrow) capable async abstraction but does it support running with coroutines? If this was only meant to work with coroutines then traditionally one would write:
Copy code
interface Source<F> {
  suspend fun findOne(key: Int): Kind<F, User>
}
And then I would have an implementation with coroutines:
Copy code
interface CoroutineSource<F> {
  
  // assume a coroutine context field available here

  suspend fun findOne(key: Int): Kind<F, User> {
    effect  { /* run the find on the coroutine context */ }
  }
}
With the
suspend
modifier in the interface and implementation, is this still usable with Observable and Rx, for example?
With
Source<F>
without the
suspend
modifier, I cannot have an implementation with the
suspend
modifier