Tower Guidev2
12/16/2020, 12:48 PMkotlinx.coroutines.channels.actor
in my Android `androidx.work.CoroutineWorker`(s).
What I am trying to achieve is that my Worker makes multiples calls to a remote API endpoint
and each API response is sent to an actor channel to be processed on another thread so that I can immediatley
start the API call for subsequent pages of data
I have these message types
sealed class Message {
class Responsible(val value: Response<List<MyApiData>>) : Message()
class GetValue(val deferred: CompletableDeferred<Int>) : Message()
}
This Actor
fun CoroutineScope.persistActor() = actor<Message>(context = Dispatchers.Unconfined, capacity = Channel.UNLIMITED) {
for (msg in channel) {
when (msg) {
is Message.Responsible -> managePage(msg.value)
is Message.GetValue -> msg.deferred.complete(1)
}
}
}
private lateinit var persister: SendChannel<Message>
each page returned from my API is processed by this recursively called function:-
private suspend fun managePages(accessToken: String, response: Response<List<MyApiData>>) {
when {
result != null -> return
response.isSuccessful -> persister.send(Message.Responsible(response))
else -> {
manageError(response.errorBody())
result = Result.failure()
return
}
}
response.headers().filter { it.first == HTTP_HEADER_LINK && it.second.contains(REL_NEXT) }.forEach {
val parts = it.second.split(OPEN_ANGLE, CLOSE_ANGLE)
if (parts.size >= 2) managePages(accessToken, service.documents(accessToken, parts[1]))
}
}
Once all the pages have been retrieved from the remote end point I execute the following code to await the actor to complete persisting the returned data
val completed = CompletableDeferred<Int>()
persister.send(Message.GetValue(completed))
println("Counter = ${completed.await()}")
persister.close()
what I am concerned about
1). is this a "good" approach?
2). Does my worker wait for all data to be persisted before completing?
3). What improvements could I make?gildor
12/16/2020, 1:22 PMTower Guidev2
12/16/2020, 1:46 PMRob
12/16/2020, 3:51 PMTower Guidev2
12/16/2020, 4:18 PMRob
12/16/2020, 4:25 PMgildor
12/17/2020, 1:09 AMwhile
is enough, like any non-suspend code, not sure why you need actor for thisfun allPages(): = flow<Page> {
val firstPage = api.loadFirstPage() // suspend call to API
emit(firstPage)
var nextPageLink = firstPage.nextPageLink
while(nextPageLink != null) {
val currentPage = api.loadPage(nextPageLink) // suspend call to API
nextPageLink = currentPage.nextPageLink
emit(firstPage)
}
}
allPages().collect { doSomethingWith(it) }
// or if you want get all data too:
allPages().onEach {
// Still possible to access every page
doSomethingWith(it)
}.toList() // but as result collect all the data as List<Page> and await result
Tower Guidev2
12/17/2020, 9:29 AMgildor
12/18/2020, 12:28 AM