https://kotlinlang.org logo
#coroutines
Title
# coroutines
d

david.bilik

07/24/2019, 5:45 AM
hello, what is the recommended way to do this - I have an api layer that has suspend functions .. my viewmodel implements
CoroutineScope
so I can correctly cancel jobs when my view is destroyed .. and I use repository pattern to fetch/store data. In one case though I use RxJava to observe database changes and If there are no data in database, i will fetch data from Api.. the method looks something like this
Copy code
override suspend fun observeData(): Flowable<Data> {
    return dao.observeData()
        .doOnNext {
            if (it.isEmpty()) {
                dao.insertData(api.fetchData())
            }
        }
}
however as you may notice, this does not work because the
doOnNext
method is not suspend and i cannot call my api. I’ve been trying some approaches, the current one which is working but I dont know if its correct is this one
Copy code
override suspend fun observeData(scope: CoroutineScope): Flowable<Data> {
    return dao.observeData()
        .doOnNext {
            if (it.isEmpty()) {
                scope.launch {
                    dao.insertData(api.fetchData())
                }
            }
        }
}
any better approaches? The easiest would be to implement
CoroutineScope
in my repository but it does not have clear strict lifecycle so I could cancel jobs when its “ending”
g

groostav

07/24/2019, 6:13 AM
This signature is really funky. 1.
suspend fun
and
returns Flowabe<Data>
is redundant 2.
suspend fun a(scope: CoroutineScope)
by convention you either
suspend fun
or take a
coroutineScope
to put jobs in... but as you've written it it should compile and run, it'l just be a little more asynchronous than you really need.
d

david.bilik

07/24/2019, 6:17 AM
yes, the
suspend
keyword is redundant on the
observeData
method. So accepting scope as function parameter is used?
g

groostav

07/24/2019, 6:24 AM
if your looking for a quick fix: yeah,
scope.luanch
should do the trick, but I would "default" to
runBlocking
, since it'l keep everything sequential maybe using a flowable combinator to put you on an IO thread first --assuming your API is IO related A longer fix: It seems to me that if your API and view are both using coroutines, I would go all the way, and convert your
Flowable<Data>
into a
ReceiveChannel<Data>
kotlinx.coroutines.flow.Flow
, and I'd wager kotlinx.coroutines.rxjava has an extension function for that. Then your
obseveData
becomes
Copy code
fun observeData(scope: CoroutineScope): ReceiveChannel<Data> {
  return dao.observeData().asFlow().map(scope) {
    if(it.isEmpty()){
      dao.insertData(api.fetchData())
    }
  }
}
but actually
map
is tricky and a bit funny, so using the
produce
and a for-each loop on
dao
might be simpler
t

tseisel

07/24/2019, 6:48 AM
Try replacing
doOnNext
with
concatMapSingle
, passing a
Single
created with
rxSingle
from
kotlinx-coroutines-rx2
. You can call suspending functions from the body of
rxSingle
.
d

Dico

07/24/2019, 10:00 AM
Instead of converting to a ReceiveChannel, converting to
Flow
may be better
👍 2
It could use the context of the collector to fetch the data
g

groostav

07/24/2019, 7:46 PM
@Dico that is true, use
Flow
instead of
Channel
if you want to go that route.
5 Views