https://kotlinlang.org logo
#flow
Title
# flow
d

Diego

11/03/2021, 8:18 PM
Based on a list of ids I need to retrieve a list of users with detailed informaiton of each user. Below a simplification of my models and API;
Copy code
data class UserDTO(val id: String)
    data class PetDTO(val name: String)
    
    interface Api {
      suspend fun users(): List<UserDTO>
      suspend fun pets(userId: String): List<PetDTO>
    }
    
    data class User(
      val id: String,
      val pets: List<Pet>
    )
    
    data class Pet(val name: String)
    
    class UsersRepository(api: Api) {
      val users: Flow<List<User>> = TODO()
    }
In RxJava I would do something like this:
Copy code
val users: Observable<List<User>> = api.users()
      .flatMapIterable { it }
      .concatMap { userDto ->
        api.pets(userDto.id)
          .map { petsListDto ->
            User(userDto.id, petsListDto.map { Pet(it.name) })
          }
      }.toList()
How can I implement
UsersRepository
and return the list of `User`s using Kotlin Flow?
n

Nick Allen

11/03/2021, 11:50 PM
Wrote and answer but, actually, I'm a little confused, why is UsersRespository.users returning a Flow? The underlaying API doesn't use any Flow.
Do you actually just want a suspend function?
Copy code
class UsersRepository(api: Api) {
    suspend fun users(): List<User> = api.users().map { userDto ->
        User(userDto.id, api.pets(userDto.id).map { petDto -> petDto.name }      
    }
If you would use a Single or Completable in RxJava, you just use a suspend method in kotlin coroutines
For the RxJava snippet, you wrote:
Copy code
val users: Observable<List<User>>
but I suspect you meant:
Copy code
val users: Single<List<User>>
based on the Api interface and based on you ending the expression with toList() which returns a Single.
d

Diego

11/04/2021, 10:57 AM
The repository returns Flow because behind scenes I have to do polling to keep sending 'real time' updates
I ended up writing the solution below, it works but I'm not sure if there is a better way of doing and I'm saying this because it would be better if I can send requests in parallel to make everything faster
Copy code
class UsersRepository(api: Api) {
    val users: Flow<List<User>> = flow {
        val usersWithPets = api.users().map { userDto ->
            val pets = api.pets(userDto.id).map { petsListDto ->
                Pet(petsListDto.name)
            }
            User(userDto.id, pets)
        }
        emit(usersWithPets)
    }
}
g

gts13

11/04/2021, 3:21 PM
imo since your API doesn’t return flow, then your suggested solution is definitely fine! But in case you also have a (Room) database, and you first store the retrieved lists of pets and users, then you could write some query that return the flow you need.
n

Nick Allen

11/04/2021, 4:12 PM
You only emit one item. In this case, I'd really recommend just using a suspend method, not a
Flow
. You wouldn't return a
List
if you only have one result in a synchronous method, so why would you return a
Flow
here? If you then want to bring it into a
Flow
expression, you can do
repository::users.asFlow()
.
If you want to do tasks at the same time, you need to use a method like
launch
or
async
. Here's your code adapted to use async.
Copy code
class UsersRepository(api: Api) {
    val users: Flow<List<User>> = flow {
        val usersWithPets = coroutineScope {
            val userIdsWithPetsTasks = api.users().map { userDto ->
                val petsTask = async {
                    api.pets(userDto.id).map { petsListDto ->
                        Pet(petsListDto.name)
                    }
                }
                userDto.id to petsTask
            }
            userIdsWithPetsTasks.map { (userId, petsTask) -> 
                User(userId, pets.await())
            }
        }
        emit(usersWithPets)
    }
}
The
api.users().map { ... }
starts a task to get the pets for each user.
userIdsWithPetsTasks.map { ... }
waits for the results for each user. Be careful how many
api.pets
calls you are making at the same time if your underlying api doesn't limit concurrency somehow for you. You can limit concurrency with a Semaphore, or using Fan-out. Side note:
emit
is outside
coroutineScope
purposely, you need
channelFlow { ... }
to emit from a coroutine other that the one the lambda is running in.
👍 1
d

Diego

11/05/2021, 4:35 PM
Thank you guys!
3 Views