https://kotlinlang.org logo
#coroutines
Title
# coroutines
t

Tower Guidev2

12/16/2020, 12:48 PM
Im investigating the use of corountines in my current Android project. Specifically
kotlinx.coroutines.channels.actor
in my Android `androidx.work.CoroutineWorker`(s). What I am trying to achieve is that my Worker makes multiples calls to a remote API endpoint and each API response is sent to an actor channel to be processed on another thread so that I can immediatley start the API call for subsequent pages of data I have these message types
sealed class Message {
class Responsible(val value: Response<List<MyApiData>>) : Message()
class GetValue(val deferred: CompletableDeferred<Int>) : Message()
}
This Actor
fun CoroutineScope.persistActor() = actor<Message>(context = Dispatchers.Unconfined, capacity = Channel.UNLIMITED) {
for (msg in channel) {
when (msg) {
is Message.Responsible -> managePage(msg.value)
is Message.GetValue -> msg.deferred.complete(1)
}
}
}
private lateinit var persister: SendChannel<Message>
each page returned from my API is processed by this recursively called function:-
private suspend fun managePages(accessToken: String, response: Response<List<MyApiData>>) {
when {
result != null -> return
response.isSuccessful -> persister.send(Message.Responsible(response))
else -> {
manageError(response.errorBody())
result = Result.failure()
return
}
}
response.headers().filter { it.first == HTTP_HEADER_LINK && it.second.contains(REL_NEXT) }.forEach {
val parts = it.second.split(OPEN_ANGLE, CLOSE_ANGLE)
if (parts.size >= 2) managePages(accessToken, service.documents(accessToken, parts[1]))
}
}
Once all the pages have been retrieved from the remote end point I execute the following code to await the actor to complete persisting the returned data
val completed = CompletableDeferred<Int>()
persister.send(Message.GetValue(completed))
println("Counter = ${completed.await()}")
persister.close()
what I am concerned about 1). is this a "good" approach? 2). Does my worker wait for all data to be persisted before completing? 3). What improvements could I make?
g

gildor

12/16/2020, 1:22 PM
It's not clear for me why you need actor, just launching multiple jobs using launch from coroutineScope block should be enough (or async + awaitAll()), scope will wait all of them to complete
Your current approach looks overcomplicated and quite brittle. But maybe there are some additional requirements Would be nice to see some example what you trying to achieve, because usage of CoroutineWorker is not different from any other suspend function
t

Tower Guidev2

12/16/2020, 1:46 PM
My API endpoint returns a maximum 500 data items each Page. These API calls can take up to 1.5 seconds to return. While waiting for the NEXT page to return I can be insertinging the PREVIOUS pages data.
r

Rob

12/16/2020, 3:51 PM
The actor pattern is best used when state needs to be synchronized from events generated asynchronously. Here is what I'm using for my concurrent API calls.
t

Tower Guidev2

12/16/2020, 4:18 PM
I require to extract the NEXT Http Header Link from call1 before I can make Call2, then I cannot make Call3 until I have the Link from Call2 etc...
r

Rob

12/16/2020, 4:25 PM
I see. Then you would get first result for the next call like this.
I think this works as well without need to to jump back and fourth between Main and IO threads between calls.
If it wasn’t clear from the examples, the apiCall1() functions are suspend functions on Retrofit services.
g

gildor

12/17/2020, 1:09 AM
still not quite sure. If you just need result of previous operation simple 
while
 is enough, like any non-suspend code, not sure why you need actor for this
If you need intermediate results (so it’s a stream of data) just create a flow, it a lot easier to implement and use
Copy code
fun allPages(): = flow<Page> {
    val firstPage = api.loadFirstPage() // suspend call to API
    emit(firstPage)
    var nextPageLink = firstPage.nextPageLink
    while(nextPageLink != null) {
        val currentPage = api.loadPage(nextPageLink) // suspend call to API
        nextPageLink = currentPage.nextPageLink
        emit(firstPage)
    }
}
so now you can use it any way how you want:
Copy code
allPages().collect { doSomethingWith(it) }

// or if you want get all data too:
allPages().onEach { 
   // Still possible to access every page
   doSomethingWith(it) 
}.toList() // but as result collect all the data as List<Page> and await result
🙌 1
t

Tower Guidev2

12/17/2020, 9:29 AM
The Flow approach does look promising, thanks very much for taking the time to look at this 😄
g

gildor

12/18/2020, 12:28 AM
It's not much different from using callback, just more idiomatic and flexible
2 Views