pakoito
11/26/2019, 11:56 AM<http://Dispatchers.IO|Dispatchers.IO>
here is where the ios are started, and each can run on its own schedulerMark Fisher
11/26/2019, 12:01 PM<http://Dispatchers.IO|Dispatchers.IO>
to work, the Dispatchers it finds is arrow.fx.typeclasses
, which has an io()
method, but this doesn't work either. I think i've got a dependency issue.Mark Fisher
11/26/2019, 12:01 PMMark Fisher
11/26/2019, 12:03 PMMark Fisher
11/26/2019, 12:06 PMIODispatchers.IOPool
simon.vergauwen
11/26/2019, 12:08 PMIO.dispatchers().default()
or IO.dispatchers().io()
. You an also parameterize over Dispatchers
if needed.simon.vergauwen
11/26/2019, 12:08 PM<http://Dispatchers.IO|Dispatchers.IO>
Paco is mentioning comes from KotlinX but I don’t advice using those with Arrow Fx since they come with some overhead specific to KotlinX.Mark Fisher
11/26/2019, 12:10 PMasCoroutineContext()
extension function on Scheduler, so I can use the same pool I was using in the RX world, i.e. <http://Schedulers.io|Schedulers.io>().asCoroutineContext()
Mark Fisher
11/26/2019, 12:18 PMsimon.vergauwen
11/26/2019, 1:01 PMsimon.vergauwen
11/26/2019, 1:02 PMDispatchers
I used meant take them as a parameter, so you could for example replace the CoroutineContext
to use IO.dispatchers()
or your own IORxJavaDispatchers
.simon.vergauwen
11/26/2019, 1:05 PMfun program(dispatchers: Dispatchers<ForIO>) = IO.fx {
val nameIO = !effect(<http://dispatchers.io|dispatchers.io>()) { println(Thread.currenThread().name) }
val nameComputation = !effect(dispatchers.computation()) { println(Thread.currenThread().name) }
!effect { println("$nameIO ~> $nameComputation") }
}
fun <F> rxDispatchers() = object : Dispatchers<F> {
fun computation() = Schedulers.computation().asCoroutineContext()
fun io() = <http://Schedulers.io|Schedulers.io>().asCoroutineContext()
}
suspend fun main() {
program(IO.dispatchers()).suspended()
program(rxDispatchers<ForIO>()).suspended()
}
Mark Fisher
11/26/2019, 1:15 PMMark Fisher
11/26/2019, 1:15 PMimport arrow.fx.ForIO
import <http://arrow.fx.IO|arrow.fx.IO>
import arrow.fx.extensions.fx
import <http://arrow.fx.extensions.io|arrow.fx.extensions.io>.dispatchers.dispatchers
import arrow.fx.rx2.extensions.asCoroutineContext
import arrow.fx.typeclasses.Dispatchers
import io.reactivex.schedulers.Schedulers
fun program(dispatchers: Dispatchers<ForIO>) = IO.fx {
val nameIO = !effect(<http://dispatchers.io|dispatchers.io>()) { println(Thread.currentThread().name) }
val nameComputation = !effect(dispatchers.default()) { println(Thread.currentThread().name) }
!effect { println("$nameIO ~> $nameComputation") }
}
fun <F> rxDispatchers() = object : Dispatchers<F> {
override fun default() = Schedulers.computation().asCoroutineContext()
override fun io() = <http://Schedulers.io|Schedulers.io>().asCoroutineContext()
}
suspend fun main() {
program(IO.dispatchers()).suspended()
program(rxDispatchers()).suspended()
}
simon.vergauwen
11/26/2019, 1:15 PM