https://kotlinlang.org logo
#coroutines
Title
# coroutines
g

ghedeon

03/06/2019, 1:36 PM
It drives me crazy. Rx version works and Coroutine version is flaky and failing in 25% of the runs. Do I miss anything about
GlobalScope.rxFlowable{}
?
g

gildor

03/06/2019, 1:42 PM
Probably coroutine version uses coroutine dispatcher with event loop, so event is not always available, add await for value before check
Bot sure how rxFlowable implemented, but all dispatchers, even runBlocking and Unconfined use event loop now
Probably in this case rxFlowable running on Default dispatcher
g

ghedeon

03/06/2019, 1:45 PM
yes, my guess was about dispatchers and wrapping in
runBlocking
didn't help
g

gildor

03/06/2019, 1:46 PM
runBlocking also has event loop of events, so it asyncronous even on the same thread
Use some of await functions of TestObserver to await event
g

ghedeon

03/06/2019, 1:48 PM
yes, that works
the real case is a bit more complicated, tho. I don't have access to the flowable itself, because it's being converted to LiveData (don't ask why 😒)
LiveDataReactiveStreams.fromPublisher(flowable)
. So, what ends up in the public API is the livedata, backed by this flowable.
Thus, somewhere in the test I have
liveData.observeForever(observer)
and naturally, the observer is flaky.
Copy code
@Test
    fun foo() {
        // given
        val call: () -> Unit = mock()

        val flowable = GlobalScope.rxFlowable {
            call()
            send("Foo")
        }
        val liveData = LiveDataReactiveStreams.fromPublisher(flowable)

        // when
        liveData.observeForever(mock())

        // then
        verify(call).invoke()
    }
Any idea how to make this stuff sequential, assuming that flowable is not exposed?
I guess, I should inject dispatcher in the
rxFlowable
then...
e

elizarov

03/06/2019, 2:13 PM
GlobalScope.rxFlowable(Dispatchers.Unconfined)
g

ghedeon

03/06/2019, 2:13 PM
Hm.. it becomes sequential with
rxFlowable(Dispatchers.Unconfined)
e

elizarov

03/06/2019, 2:13 PM
That is the equivalent of Rx version 😉
g

ghedeon

03/06/2019, 2:13 PM
simple smile
e

elizarov

03/06/2019, 2:13 PM
Rx is unconfined by default, Coroutines work in background by default. But you can change defaults in both libs.
g

ghedeon

03/06/2019, 2:16 PM
Thank you! What about
GlobalScope.rxFlowable{}.subscribeOn(scheduler)
? Shouldn't provided rx scheduler somehow replace the coroutine dispatcher? Doesn't seem like it's happening now.
e

elizarov

03/07/2019, 6:43 AM
subscribeOn only works for sources that do not have their own scheduler.
👍 1
3 Views