It drives me crazy. Rx version works and Coroutine...
# coroutines
g
It drives me crazy. Rx version works and Coroutine version is flaky and failing in 25% of the runs. Do I miss anything about
GlobalScope.rxFlowable{}
?
g
Probably coroutine version uses coroutine dispatcher with event loop, so event is not always available, add await for value before check
Bot sure how rxFlowable implemented, but all dispatchers, even runBlocking and Unconfined use event loop now
Probably in this case rxFlowable running on Default dispatcher
g
yes, my guess was about dispatchers and wrapping in
runBlocking
didn't help
g
runBlocking also has event loop of events, so it asyncronous even on the same thread
Use some of await functions of TestObserver to await event
g
yes, that works
the real case is a bit more complicated, tho. I don't have access to the flowable itself, because it's being converted to LiveData (don't ask why 😒)
LiveDataReactiveStreams.fromPublisher(flowable)
. So, what ends up in the public API is the livedata, backed by this flowable.
Thus, somewhere in the test I have
liveData.observeForever(observer)
and naturally, the observer is flaky.
Copy code
@Test
    fun foo() {
        // given
        val call: () -> Unit = mock()

        val flowable = GlobalScope.rxFlowable {
            call()
            send("Foo")
        }
        val liveData = LiveDataReactiveStreams.fromPublisher(flowable)

        // when
        liveData.observeForever(mock())

        // then
        verify(call).invoke()
    }
Any idea how to make this stuff sequential, assuming that flowable is not exposed?
I guess, I should inject dispatcher in the
rxFlowable
then...
e
GlobalScope.rxFlowable(Dispatchers.Unconfined)
g
Hm.. it becomes sequential with
rxFlowable(Dispatchers.Unconfined)
e
That is the equivalent of Rx version 😉
g
simple smile
e
Rx is unconfined by default, Coroutines work in background by default. But you can change defaults in both libs.
g
Thank you! What about
GlobalScope.rxFlowable{}.subscribeOn(scheduler)
? Shouldn't provided rx scheduler somehow replace the coroutine dispatcher? Doesn't seem like it's happening now.
e
subscribeOn only works for sources that do not have their own scheduler.
👍 1