https://kotlinlang.org logo
Title
a

ahulyk

08/06/2019, 10:16 AM
Retrofit does not support Flow. So if I need to merge concurrent requests using 'flatMapMerge' what is the right way to do this?
d

Dominaezzz

08/06/2019, 10:22 AM
You don't need
Flow
for this.
You just need
List<Deferred<T>>.awaitAll()
.
You can do something like,
val requests = listOf(
    async { api.getUser() },
    async { api.getComments() }
)
val responses = requests.awaitAll()
l

louiscad

08/06/2019, 10:29 AM
Also, make sure to wrap these in a local
coroutineScope { … }
and handle errors around it.
a

ahulyk

08/06/2019, 11:53 AM
Flow
has great API and I need
ConflatedBroadcastChannel
for some reasons. What i did:
flowOf(StreamType.values().asFlow()
                        .flatMapMerge(10) { stream ->
                            flowOf(stream to async { apiService.getCurrentInfo(stream.path) })
                        }
                        .map { Pair(it.first, it.second.await()) }
                        .fold(mutableMapOf(), { map: MutableMap<StreamType, ContentMetadataDto>, pair: Pair<StreamType, ContentMetadataDto> ->
                            map.apply { put(pair.first, pair.second) }
                        }))
                        .onEach { map ->
                            contentMetadata.send(map)
                        }
                        .catch {
                            emit(StreamType.values().map { it to ContentMetadataDto.NO_DATA }.toMap(mutableMapOf()))
                        }
                        .collect {
                            contentMetadata.send(it)
                        }
d

Dominaezzz

08/06/2019, 12:02 PM
I think you can replace the first
flowOf
with
StreamType.values().associateWith { stream -> async { apiService.getCurrentInfo(stream.path) } }.mapValues { it.second.await() }
(I'm not sure, it's hard to read).
flowOf(StreamType.values().asFlow()
    .flatMapMerge(10) { stream ->
        flowOf(stream to async { apiService.getCurrentInfo(stream.path) })
    }
    .associate { Pair(it.first, it.second.await()) }))
    .onEach { map ->
         contentMetadata.send(map)
    }
    .catch {
         emit(StreamType.values().associateWith { ContentMetadataDto.NO_DATA }
    }
    .collect {
        contentMetadata.send(it)
    }
a

ahulyk

08/06/2019, 12:10 PM
yeap, but this works fine - thank you)
d

Dico

08/06/2019, 12:53 PM
Also you can use
associateWith