david.bilik
07/24/2019, 5:45 AMCoroutineScope
so I can correctly cancel jobs when my view is destroyed .. and I use repository pattern to fetch/store data. In one case though I use RxJava to observe database changes and If there are no data in database, i will fetch data from Api.. the method looks something like this
override suspend fun observeData(): Flowable<Data> {
return dao.observeData()
.doOnNext {
if (it.isEmpty()) {
dao.insertData(api.fetchData())
}
}
}
however as you may notice, this does not work because the doOnNext
method is not suspend and i cannot call my api. I’ve been trying some approaches, the current one which is working but I dont know if its correct is this one
override suspend fun observeData(scope: CoroutineScope): Flowable<Data> {
return dao.observeData()
.doOnNext {
if (it.isEmpty()) {
scope.launch {
dao.insertData(api.fetchData())
}
}
}
}
any better approaches? The easiest would be to implement CoroutineScope
in my repository but it does not have clear strict lifecycle so I could cancel jobs when its “ending”groostav
07/24/2019, 6:13 AMsuspend fun
and returns Flowabe<Data>
is redundant
2. suspend fun a(scope: CoroutineScope)
by convention you either suspend fun
or take a coroutineScope
to put jobs in...
but as you've written it it should compile and run, it'l just be a little more asynchronous than you really need.david.bilik
07/24/2019, 6:17 AMsuspend
keyword is redundant on the observeData
method. So accepting scope as function parameter is used?groostav
07/24/2019, 6:24 AMscope.luanch
should do the trick, but I would "default" to runBlocking
, since it'l keep everything sequential maybe using a flowable combinator to put you on an IO thread first --assuming your API is IO related
A longer fix: It seems to me that if your API and view are both using coroutines, I would go all the way, and convert your Flowable<Data>
into a ReceiveChannel<Data>
kotlinx.coroutines.flow.Flow
, and I'd wager kotlinx.coroutines.rxjava has an extension function for that. Then your obseveData
becomes
fun observeData(scope: CoroutineScope): ReceiveChannel<Data> {
return dao.observeData().asFlow().map(scope) {
if(it.isEmpty()){
dao.insertData(api.fetchData())
}
}
}
but actually map
is tricky and a bit funny, so using the produce
and a for-each loop on dao
might be simplertseisel
07/24/2019, 6:48 AMdoOnNext
with concatMapSingle
, passing a Single
created with rxSingle
from kotlinx-coroutines-rx2
. You can call suspending functions from the body of rxSingle
.Dico
07/24/2019, 10:00 AMFlow
may be betterDico
07/24/2019, 10:00 AMgroostav
07/24/2019, 7:46 PMFlow
instead of Channel
if you want to go that route.