hello, what is the recommended way to do this - I ...
# coroutines
d
hello, what is the recommended way to do this - I have an api layer that has suspend functions .. my viewmodel implements
CoroutineScope
so I can correctly cancel jobs when my view is destroyed .. and I use repository pattern to fetch/store data. In one case though I use RxJava to observe database changes and If there are no data in database, i will fetch data from Api.. the method looks something like this
Copy code
override suspend fun observeData(): Flowable<Data> {
    return dao.observeData()
        .doOnNext {
            if (it.isEmpty()) {
                dao.insertData(api.fetchData())
            }
        }
}
however as you may notice, this does not work because the
doOnNext
method is not suspend and i cannot call my api. I’ve been trying some approaches, the current one which is working but I dont know if its correct is this one
Copy code
override suspend fun observeData(scope: CoroutineScope): Flowable<Data> {
    return dao.observeData()
        .doOnNext {
            if (it.isEmpty()) {
                scope.launch {
                    dao.insertData(api.fetchData())
                }
            }
        }
}
any better approaches? The easiest would be to implement
CoroutineScope
in my repository but it does not have clear strict lifecycle so I could cancel jobs when its “ending”
g
This signature is really funky. 1.
suspend fun
and
returns Flowabe<Data>
is redundant 2.
suspend fun a(scope: CoroutineScope)
by convention you either
suspend fun
or take a
coroutineScope
to put jobs in... but as you've written it it should compile and run, it'l just be a little more asynchronous than you really need.
d
yes, the
suspend
keyword is redundant on the
observeData
method. So accepting scope as function parameter is used?
g
if your looking for a quick fix: yeah,
scope.luanch
should do the trick, but I would "default" to
runBlocking
, since it'l keep everything sequential maybe using a flowable combinator to put you on an IO thread first --assuming your API is IO related A longer fix: It seems to me that if your API and view are both using coroutines, I would go all the way, and convert your
Flowable<Data>
into a
ReceiveChannel<Data>
kotlinx.coroutines.flow.Flow
, and I'd wager kotlinx.coroutines.rxjava has an extension function for that. Then your
obseveData
becomes
Copy code
fun observeData(scope: CoroutineScope): ReceiveChannel<Data> {
  return dao.observeData().asFlow().map(scope) {
    if(it.isEmpty()){
      dao.insertData(api.fetchData())
    }
  }
}
but actually
map
is tricky and a bit funny, so using the
produce
and a for-each loop on
dao
might be simpler
t
Try replacing
doOnNext
with
concatMapSingle
, passing a
Single
created with
rxSingle
from
kotlinx-coroutines-rx2
. You can call suspending functions from the body of
rxSingle
.
d
Instead of converting to a ReceiveChannel, converting to
Flow
may be better
👍 2
It could use the context of the collector to fetch the data
g
@Dico that is true, use
Flow
instead of
Channel
if you want to go that route.