david.bilik
07/24/2019, 5:45 AMCoroutineScope so I can correctly cancel jobs when my view is destroyed .. and I use repository pattern to fetch/store data. In one case though I use RxJava to observe database changes and If there are no data in database, i will fetch data from Api.. the method looks something like this
override suspend fun observeData(): Flowable<Data> {
return dao.observeData()
.doOnNext {
if (it.isEmpty()) {
dao.insertData(api.fetchData())
}
}
}
however as you may notice, this does not work because the doOnNext method is not suspend and i cannot call my api. I’ve been trying some approaches, the current one which is working but I dont know if its correct is this one
override suspend fun observeData(scope: CoroutineScope): Flowable<Data> {
return dao.observeData()
.doOnNext {
if (it.isEmpty()) {
scope.launch {
dao.insertData(api.fetchData())
}
}
}
}
any better approaches? The easiest would be to implement CoroutineScope in my repository but it does not have clear strict lifecycle so I could cancel jobs when its “ending”groostav
07/24/2019, 6:13 AMsuspend fun and returns Flowabe<Data> is redundant
2. suspend fun a(scope: CoroutineScope) by convention you either suspend fun or take a coroutineScope to put jobs in...
but as you've written it it should compile and run, it'l just be a little more asynchronous than you really need.david.bilik
07/24/2019, 6:17 AMsuspend keyword is redundant on the observeData method. So accepting scope as function parameter is used?groostav
07/24/2019, 6:24 AMscope.luanch should do the trick, but I would "default" to runBlocking, since it'l keep everything sequential maybe using a flowable combinator to put you on an IO thread first --assuming your API is IO related
A longer fix: It seems to me that if your API and view are both using coroutines, I would go all the way, and convert your Flowable<Data> into a ReceiveChannel<Data>kotlinx.coroutines.flow.Flow, and I'd wager kotlinx.coroutines.rxjava has an extension function for that. Then your obseveData becomes
fun observeData(scope: CoroutineScope): ReceiveChannel<Data> {
return dao.observeData().asFlow().map(scope) {
if(it.isEmpty()){
dao.insertData(api.fetchData())
}
}
}
but actually map is tricky and a bit funny, so using the produce and a for-each loop on dao might be simplertseisel
07/24/2019, 6:48 AMdoOnNext with concatMapSingle, passing a Single created with rxSingle from kotlinx-coroutines-rx2. You can call suspending functions from the body of rxSingle.Dico
07/24/2019, 10:00 AMFlow may be betterDico
07/24/2019, 10:00 AMgroostav
07/24/2019, 7:46 PMFlow instead of Channel if you want to go that route.