Pedro Silva
09/20/2019, 2:14 PMfun CompletionStage<V>.asIO(): IO<V> = IO.async then I won't be able to have fun <V> startThings(a: Async<V>) = object : MemSource<V>, Async<V> by a. What am I missing that connects IO<F> to Async<F>?simon.vergauwen
09/20/2019, 2:15 PMsimon.vergauwen
09/20/2019, 2:16 PMAsync<ForIO and IO<A> btw, not sure if that clears anything up.simon.vergauwen
09/20/2019, 2:16 PMAsync<F> defines behavior/logic for F.simon.vergauwen
09/20/2019, 2:16 PMAsync<ForIO>.async { ... } == IO.async { ... }Pedro Silva
09/20/2019, 2:34 PMAsync<F> defines behaviour/logic for F and IO.async defines the IO<F>.
interface Source<F> : Async<F>
interface Printer<F> : Monad<F>
interface Fetcher<F> : Source<F>, Printer<F>
fun startThingsForIO(src: Source<ForIO>, a: Async<ForIO>) = object : Async<ForIO> by a
, Source<ForIO> by src
, Fetcher<ForIO> {}Pedro Silva
09/20/2019, 2:37 PMPedro Silva
09/20/2019, 2:37 PMObject must override public open val fx: AsyncFx<ForIO> defined in arrow.kt.tests.main.startThingsForIO.<no name provided> because it inherits many implementations of it
e: .../arrow-kt-tests/src/main/kotlin/arrow/kt/tests/Problem.kt: (58, 65): Object must override public open fun <A> Ref(a: A): Kind<ForIO, Ref<ForIO, A>> defined in arrow.kt.tests.main.startThingsForIO.<no name provided> because it inherits many implementations of itPedro Silva
09/20/2019, 2:40 PMSource<F> extends Async<F> and the object : also extends Async<F> the compiler doesn't know which one to choose.Pedro Silva
09/20/2019, 2:41 PMfun startThingsForIO(src: Source<ForIO>) = object : Source<ForIO> by src, Fetcher<ForIO> make sense?
How do I pass it a Source for CompletionStage for example?Bob Glamm
09/20/2019, 2:43 PMBob Glamm
09/20/2019, 2:44 PMPedro Silva
09/20/2019, 3:21 PMfun startThingsForIO(src: Source<ForIO>) = object : Source<ForIO> by src, Fetcher<ForIO> {
}
startThingsForIO(object : MemSource<???> {
override val db: MutableMap<Int, User> = mutableMapOf()
})Pedro Silva
09/20/2019, 3:30 PMCompletionStage<V>.asIO fits in here.simon.vergauwen
09/21/2019, 8:37 AMsimon.vergauwen
09/21/2019, 8:38 AMPedro Silva
09/21/2019, 4:46 PMForIO how do we actually start it?
2. What changes would have to be made to run this with a ForOption?
3. Given that source has 1 method that is not a suspend fun this will never work with coroutines, so even if I use CompletionStage, or Observable or any other async abstraction the suspend modifier will still have to be present in the interface, correct?Pedro Silva
09/21/2019, 4:47 PMPedro Silva
09/21/2019, 4:47 PMPedro Silva
09/21/2019, 4:47 PMsimon.vergauwen
09/21/2019, 5:25 PMForIO how do we actually start it?
Running an IO can be done in a couple of different ways but this is ideally done at the end of your program, ideally you would never run the IO yourself and instead do fun main(arg: Array<String>): IO<Unit> = program().
In the cases where you have to run it yourself because you don’t have fun main(arg: Array<String>): IO<Unit> then you should use unsafeRunAsync or unsafeRunCancelable. There is an example in the snippet attached below.
2. What changes would have to be made to run this with a ForOption?
You cannot run this program with Option since it doesn’t have the power of doing running effects in a controlled environment. This is clear from the instances as Option only has instances up to Monad, so no instance for Async to satisfy the constraints of your program.
Option, Either, State, etc are tools to help model certain types of data while IO, RxJava, Reactor etc provide tools for working with side-effects in a controlled way, and by doing so they can offer powerful concurrency tools that help solve real business problems like recovering from errors, concurrency, parallelism, composing different effect-full programs etc. Running this program with RxJava, Reactor (or any other effect lib) could be done simply by using the Async instance for them. I’ve added examples for RxJava & Reactor below.
3. Given that source has 1 method that is not a suspend fun this will never work with coroutines, so even if I use CompletionStage, or Observable or any other async abstraction the suspend modifier will still have to be present in the interface, correct?
I am not sure I 100% follow but you can consume suspend within F by using Async<F>#effect which takes a suspend lambda as it’s parameter. You can currently also move from IO<A> to A using a suspend function suspend fun IO<A>.suspended(): A. This function will also make it into the type-class hierarchy soon but is not available there yet but by looking at the implementation of IO it should be easy to write the same function for other `F`s.simon.vergauwen
09/21/2019, 5:26 PMsimon.vergauwen
09/21/2019, 5:26 PMPedro Silva
09/22/2019, 2:13 PMAsync<F> for those types and then run as startThingsForF(VertxK.async().memSource()) or startThingsForF(CompletionStageK.async().memSource()), correct?
2. I suspected that. Thank you for confirming, it makes sense.
3. The goal here was to not have 2 interfaces. The current Source<T> interface is compatible with any IO (from Arrow) capable async abstraction but does it support running with coroutines?
If this was only meant to work with coroutines then traditionally one would write:
interface Source<F> {
suspend fun findOne(key: Int): Kind<F, User>
}
And then I would have an implementation with coroutines:
interface CoroutineSource<F> {
// assume a coroutine context field available here
suspend fun findOne(key: Int): Kind<F, User> {
effect { /* run the find on the coroutine context */ }
}
}
With the suspend modifier in the interface and implementation, is this still usable with Observable and Rx, for example?Pedro Silva
09/22/2019, 2:15 PMSource<F> without the suspend modifier, I cannot have an implementation with the suspend modifier