Ahmed
01/17/2025, 10:04 AMAhmed
01/17/2025, 10:04 AM@Stable
sealed interface ResultState<out T> {
@Stable
data class Success<T>(val data: T) : ResultState<T>
@Immutable
data class Error(val exception: Throwable) : ResultState<Nothing>
@Immutable
data object Loading : ResultState<Nothing>
}
inline fun <I, O> Flow<ResultState<I>>.mapResultState(
crossinline transform: suspend (I) -> O,
): Flow<ResultState<O>> = map { result ->
return@map when (result) {
ResultState.Loading -> ResultState.Loading
is ResultState.Error -> ResultState.Error(result.exception)
is ResultState.Success -> ResultState.Success(transform(result.data))
}
}
inline fun <I, O> Flow<ResultState<I>>.flatMapResultState(
crossinline transform: suspend (I) -> Flow<ResultState<O>>,
): Flow<ResultState<O>> = flow {
collect { result ->
when (result) {
ResultState.Loading -> Unit
is ResultState.Error -> emit(ResultState.Error(result.exception))
is ResultState.Success -> transform(result.data)
}
}
}
Ahmed
01/17/2025, 10:05 AMResultState
Ahmed
01/17/2025, 10:06 AMflatMapResultState
is for cases when the ResultState
of first api call should start another api call.Ahmed
01/17/2025, 10:08 AMfun myChainedCall(): Flow<ResultState<List<MyModel>>> = firstCall().flatMapResultState { response ->
val results = response.myData
secondCall(input = results).mapResultState { data ->
// My mapping logic is here
}
}
}
Ahmed
01/17/2025, 10:09 AMAhmed
01/17/2025, 10:09 AMflatMapResultState
does get triggeredAhmed
01/17/2025, 10:09 AMAhmed
01/17/2025, 10:10 AMmapResultState
also gets triggered and shows that it indeed has the data that can be transformedAhmed
01/17/2025, 10:16 AMSzymon Jeziorski
01/17/2025, 10:43 AMis ResultState.Success -> transform(result.data)
You're not doing anything with the result flow, I think you missed emitAll
thereAhmed
01/17/2025, 11:04 AMemitAll
. But still unable to collect anything.
flatMapResultState
is triggered, I get the result but nothing moves downstreamSzymon Jeziorski
01/17/2025, 11:22 AMmyChainedCall
invoked?
Your code seems to work just fine for me. Here's some sample I've tested on:
fun firstCall() = flow {
delay(500)
emit(ResultState.Loading.also { println("Emitted $it on first call") })
delay(500)
emit(ResultState.Success(listOf("one_one", "one_two").also { println("Emitted $it on first call") }))
delay(500)
emit(ResultState.Error(IllegalStateException("some error")).also { println("Emitted $it on first call") })
delay(500)
emit(ResultState.Loading.also { println("Emitted $it on first call") })
delay(500)
emit(ResultState.Success(listOf("two_one")).also { println("Emitted $it on first call") })
}
fun secondCall(inputs: List<String>) = flow {
inputs.forEach { input ->
delay(500)
emit(ResultState.Success(input).also { println("Emitted $it on second call") })
delay(500)
emit(ResultState.Success("${input}_with_second_call_mapping").also { println("Emitted $it on second call") })
}
}
fun myChainedCall(): Flow<ResultState<String>> = firstCall().flatMapResultState { response ->
val results = response
secondCall(inputs = results).mapResultState { data -> "mapped_$data" }
}
suspend fun main() {
myChainedCall().collect { println("Collected $it\n") }
}
Ahmed
01/17/2025, 11:23 AMAhmed
01/17/2025, 11:23 AMSergey Dmitriev
01/17/2025, 1:27 PMemitAll
?
What are you expecting?Ahmed
01/17/2025, 1:41 PMAhmed
01/17/2025, 1:42 PMSergey Dmitriev
01/17/2025, 1:45 PMflatMapLatest
operator 🙂Ahmed
01/17/2025, 4:05 PMSergey Dmitriev
01/17/2025, 5:33 PMflatMapResultState
to an infinite flow, because emitAll
will suspend until the passed flow is completed and only then resume to collect next inputs
btw I believe it’s totally safe to use flatMapLatest
, the reason it is still experimental is that there is no agreement on some implementation details for some cases, more details here: https://github.com/Kotlin/kotlinx.coroutines/issues/3168Ahmed
01/17/2025, 8:18 PMflatMapResultState
then?
I don’t completely understand this
Do you mean that if there were more values in the upstream, they will have to wait forwill suspend until the passed flow is completed and only then resume to collect next inputs.emitAll
emitAll
to complete?
In my particular use case ResultState.Success
and ResultState.Error
are terminal i.e. they are always the last value emitted and they only emit once.
infinite flowI don’t understand this but I’m happy to learn more about it and take your design input.
Sergey Dmitriev
01/20/2025, 12:22 PMemitAll
to complete?
Exactly, they will wait until the downstream completes
> In my particular use case ResultState.Success
and ResultState.Error
are terminal i.e. they are always the last value emitted and they only emit once.
I of course can’t know your exact usecase, however what you are describing sounds like flatMapConcat
(also experimental), however to me it feels like you need flatMapLatest
but that depends on what you want to do if the upstream emits a new value while the previous one is still processing.
> I don’t understand this but I’m happy to learn more about it and take your design input.
An example of infinite flow could be a stream of user’s LatLng
It never ends, so if you do something like
originalStream.flatMapResultState {
userLatLngStream.map { /* transform */ }
}
It’s not gonna work, since userLatLngStream
will never finish emitAll
Ahmed
01/20/2025, 12:30 PMAn example of infinite flow could be a stream of user’sDoes this mean thatLatLng
LatLng
is a hot flow? Also, the change that I need to make is replace emitAll
to flatMapLatest
, right?Sergey Dmitriev
01/20/2025, 12:36 PMLatLng
is a hot flow
Nope it doesn’t have to be, it can be cold or hot but still infinite,
For example consider this cold infinite flow:
flow {
while(true) {
delay(1000)
emit(generateNextRandomValue())
}
}
> Also, the change that I need to make is replace emitAll
to flatMapLatest
, right?
You can try something like:
inline fun <I, O> Flow<ResultState<I>>.flatMapResultState(
crossinline transform: suspend (I) -> Flow<ResultState<O>>,
): Flow<ResultState<O>> = flatMapLatest { result ->
when (result) {
is ResultState.Loading -> flowOf(ResultState.Loading)
is ResultState.Error -> flowOf(ResultState.Error(result.exception))
is ResultState.Success -> transform(result.data)
}
}