Hi All, Continuing my understanding of arrow and c...
# arrow
m
Hi All, Continuing my understanding of arrow and concurrency, how do I write a comprehension that makes concurrent calls within it. My naive first attempt is this, but the "network" calls happen sequentially.
Copy code
@Test
    fun `concurrent arrow and rx`() {

        val x = SingleK.fx {
            val requiredValue = !getCountSingle
            println("network call A")
            val networkCallAValue = !pauseAndIncrement(requiredValue).k()
            println("network call B")
            val networkCallBValue = !pauseAndIncrement(requiredValue).k()

            networkCallAValue + networkCallBValue
        }.value()

        assertThat(x.blockingGet()).isEqualTo(4)
    }

    private fun pauseAndIncrement(i: Int): Single<Int> {
        println("Entering pause...")
        Thread.sleep(2000)
        println("... returning single")
        return Single.just(i + 1)
    }
getCountSingle
in this context should block the subsequent assignments until it is available (this is fine) but its value should be usable by the 2 subsequent calls which I'd like to happen concurrently as they are independent, and I just need to block on the result of them both. Is there a nice way of doing this without it becoming spagetti code?
r
Hi Mark, you can use fibers https://arrow-kt.io/docs/fx/async/#fibers
Or combinators like parMap or parTraverse
m
Checking that out now, thanks Raul
👍 1
r
Fibers do what you want with fine grained control where you can join then back
If you are already familiar with async/await it's similar but called fork/join
m
I'm only used to using Rx with Single.zip to do this, but this has lead to some code that doesn't flow particularly well, with too much muddying of the logic. I'd like to create simpler comprehensions where the concurrency adds as little as possible to the method.
r
Yes, that is a good case to use fibers, a fiber is like a regular binding but you control when it starts and when it blocks and they use workstealing which means that they are efficient and don't necessarily need a custom pool or thread
Though If you are using I/O bound operations underneath like JDBC calls or things that are actually are thread blocking you may want to use a the IO dispatcher for those fibers which is optimized for those kinds of ops making more threads available
s
You should always favor the bundled operators
parMapN
,
raceN
,
parTraverse
&
parSequence
in favor of
fork/join
since those are optimised and guarantee resource-safety, correct error-handling and cancelation.
☝🏼 1
🔝 2
😍 1
RxJava also has all these concurrent operators available within
SingleK.fx
or through importing the extension functions from the
concurrent
package
m
I've changed the code to this, but the parMapN is doing its 2 functions sequentially. is that because of the Thread.sleep and they are actually on the same thread?
Copy code
@Test
    fun `concurrent arrow and rx`() {

        val x = SingleK.fx {
            val requiredValue = !getCountSingle

            println("creating y")
            val y = <http://Schedulers.io|Schedulers.io>().asCoroutineContext().parMapN(
                    pauseAndIncrement(requiredValue).k(),
                    pauseAndIncrement(requiredValue).k()) { a, b ->
                a + b
            }

            println ("doing effect of y")
            !y
        }.value()

        println("doing assert")
        assertThat(x.blockingGet()).isEqualTo(4)
    }
s
Concurrent
also provides
sleep
but you could also use the
Single.timer
from
RxJava
as well. You should use that instead of
Thread.sleep
anyway. I should check, because I wouldn’t expect it to block there TBH.
<http://Schedulers.io|Schedulers.io>()
should be a seperate worker thread for easy task
m
It prints following when run:
Copy code
doing assert
creating y
Entering pause...
... returning single
Entering pause...
... returning single
doing effect of y
I was hoping the Entering pause twice instead of sequential.
s
Yes, can you share the complete snippet again?
m
Copy code
package jl.catalogue.summary.arrow

import arrow.fx.rx2.SingleK
import arrow.fx.rx2.SingleKOf
import arrow.fx.rx2.extensions.asCoroutineContext
import arrow.fx.rx2.extensions.fx
import arrow.fx.rx2.extensions.singlek.monadDefer.monadDefer
import arrow.fx.rx2.k
import arrow.fx.rx2.value
import arrow.fx.typeclasses.MonadDefer
import io.reactivex.Single
import io.reactivex.schedulers.Schedulers
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test

class ArrowRxTests {
    @Test
    fun `comprehension with arrow and rx`() {
        val x = SingleK.fx {
            val count = !getCountSingle
            val newCount = !increment(count).k()
            newCount
        }.value()

        assertThat(x.blockingGet()).isEqualTo(2)
    }

    private fun getCount() = 1

    private fun <F> getCountAsync(MS: MonadDefer<F>) = MS.later { getCount() }
    private val getCountSingle: SingleKOf<Int> = getCountAsync(SingleK.monadDefer())

    private fun increment(i: Int): Single<Int> = Single.just(i + 1)


    @Test
    fun `concurrent arrow and rx`() {

        val x = SingleK.fx {
            val requiredValue = !getCountSingle

            println("creating y")
            val y = <http://Schedulers.io|Schedulers.io>().asCoroutineContext().parMapN(
                    pauseAndIncrement(requiredValue).k(),
                    pauseAndIncrement(requiredValue).k()) { a, b ->
                a + b
            }

            println ("doing effect of y")
            !y
        }.value()

        println("doing assert")
        assertThat(x.blockingGet()).isEqualTo(4)
    }

    private fun pauseAndIncrement(i: Int): Single<Int> {
        println("Entering pause...")
        Thread.sleep(2000)
        println("... returning single")
        return Single.just(i + 1)
    }

}
Also tried changing pauseAndIncrement to a pure Single, but still didn't work:
Copy code
private fun pauseAndIncrement(i: Int): Single<Int> {
        return Single.fromCallable {
            println("Entering pause...")
            Thread.sleep(2000)
            println("... returning single")
            i + 1
        }
    }
s
Copy code
creating y
doing effect of y
Entering pause...
Entering pause...
... returning single
... returning single
m
Have you changed anything? The test takes 4s here so I'm definitely getting sequential non parallel behaviour:
s
I slightly changed the code since I am using KotlinTest assertions here but beside that I think this Should make your code a bit cleaner (
k()
and
fix()
wise). Now the code is also properly called in parallel 🙂 You should never use
Thread.sleep
because it blocks a thread, which means that doing so can also interrupt how certain async jumops are made in schedulers etc.
Takes 2s62ms after my changes
m
Thanks. I'm only using Thread.sleep as the 'work' emulator (e.g. network call in real-world where this will be used)
This is great. Many thanks @simon.vergauwen I'm going to apply changes to the real case scenarios now
s
You’re very welcome @Mark Fisher! If you have any other questions I’d be happy to help 👍