Mark Fisher
01/23/2020, 10:48 AM@Test
fun `concurrent arrow and rx`() {
val x = SingleK.fx {
val requiredValue = !getCountSingle
println("network call A")
val networkCallAValue = !pauseAndIncrement(requiredValue).k()
println("network call B")
val networkCallBValue = !pauseAndIncrement(requiredValue).k()
networkCallAValue + networkCallBValue
}.value()
assertThat(x.blockingGet()).isEqualTo(4)
}
private fun pauseAndIncrement(i: Int): Single<Int> {
println("Entering pause...")
Thread.sleep(2000)
println("... returning single")
return Single.just(i + 1)
}
getCountSingle
in this context should block the subsequent assignments until it is available (this is fine) but its value should be usable by the 2 subsequent calls which I'd like to happen concurrently as they are independent, and I just need to block on the result of them both.
Is there a nice way of doing this without it becoming spagetti code?raulraja
01/23/2020, 10:51 AMraulraja
01/23/2020, 10:51 AMraulraja
01/23/2020, 10:51 AMMark Fisher
01/23/2020, 10:52 AMraulraja
01/23/2020, 10:52 AMraulraja
01/23/2020, 10:53 AMMark Fisher
01/23/2020, 10:55 AMraulraja
01/23/2020, 10:57 AMraulraja
01/23/2020, 10:58 AMsimon.vergauwen
01/23/2020, 11:01 AMparMapN
, raceN
, parTraverse
& parSequence
in favor of fork/join
since those are optimised and guarantee resource-safety, correct error-handling and cancelation.simon.vergauwen
01/23/2020, 11:01 AMSingleK.fx
or through importing the extension functions from the concurrent
packageMark Fisher
01/23/2020, 12:29 PM@Test
fun `concurrent arrow and rx`() {
val x = SingleK.fx {
val requiredValue = !getCountSingle
println("creating y")
val y = <http://Schedulers.io|Schedulers.io>().asCoroutineContext().parMapN(
pauseAndIncrement(requiredValue).k(),
pauseAndIncrement(requiredValue).k()) { a, b ->
a + b
}
println ("doing effect of y")
!y
}.value()
println("doing assert")
assertThat(x.blockingGet()).isEqualTo(4)
}
simon.vergauwen
01/23/2020, 12:30 PMConcurrent
also provides sleep
but you could also use the Single.timer
from RxJava
as well. You should use that instead of Thread.sleep
anyway.
I should check, because I wouldn’t expect it to block there TBH. <http://Schedulers.io|Schedulers.io>()
should be a seperate worker thread for easy taskMark Fisher
01/23/2020, 12:30 PMdoing assert
creating y
Entering pause...
... returning single
Entering pause...
... returning single
doing effect of y
I was hoping the Entering pause twice instead of sequential.simon.vergauwen
01/23/2020, 12:31 PMMark Fisher
01/23/2020, 12:32 PMpackage jl.catalogue.summary.arrow
import arrow.fx.rx2.SingleK
import arrow.fx.rx2.SingleKOf
import arrow.fx.rx2.extensions.asCoroutineContext
import arrow.fx.rx2.extensions.fx
import arrow.fx.rx2.extensions.singlek.monadDefer.monadDefer
import arrow.fx.rx2.k
import arrow.fx.rx2.value
import arrow.fx.typeclasses.MonadDefer
import io.reactivex.Single
import io.reactivex.schedulers.Schedulers
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
class ArrowRxTests {
@Test
fun `comprehension with arrow and rx`() {
val x = SingleK.fx {
val count = !getCountSingle
val newCount = !increment(count).k()
newCount
}.value()
assertThat(x.blockingGet()).isEqualTo(2)
}
private fun getCount() = 1
private fun <F> getCountAsync(MS: MonadDefer<F>) = MS.later { getCount() }
private val getCountSingle: SingleKOf<Int> = getCountAsync(SingleK.monadDefer())
private fun increment(i: Int): Single<Int> = Single.just(i + 1)
@Test
fun `concurrent arrow and rx`() {
val x = SingleK.fx {
val requiredValue = !getCountSingle
println("creating y")
val y = <http://Schedulers.io|Schedulers.io>().asCoroutineContext().parMapN(
pauseAndIncrement(requiredValue).k(),
pauseAndIncrement(requiredValue).k()) { a, b ->
a + b
}
println ("doing effect of y")
!y
}.value()
println("doing assert")
assertThat(x.blockingGet()).isEqualTo(4)
}
private fun pauseAndIncrement(i: Int): Single<Int> {
println("Entering pause...")
Thread.sleep(2000)
println("... returning single")
return Single.just(i + 1)
}
}
Mark Fisher
01/23/2020, 12:46 PMprivate fun pauseAndIncrement(i: Int): Single<Int> {
return Single.fromCallable {
println("Entering pause...")
Thread.sleep(2000)
println("... returning single")
i + 1
}
}
simon.vergauwen
01/23/2020, 1:01 PMcreating y
doing effect of y
Entering pause...
Entering pause...
... returning single
... returning single
Mark Fisher
01/23/2020, 1:04 PMsimon.vergauwen
01/23/2020, 1:04 PMk()
and fix()
wise).
Now the code is also properly called in parallel 🙂 You should never use Thread.sleep
because it blocks a thread, which means that doing so can also interrupt how certain async jumops are made in schedulers etc.simon.vergauwen
01/23/2020, 1:05 PMMark Fisher
01/23/2020, 1:05 PMMark Fisher
01/23/2020, 1:16 PMsimon.vergauwen
01/23/2020, 1:17 PM