For some reason the flow doesn't work as expected ...
# codereview
a
For some reason the flow doesn't work as expected for my use case. More in 🧵
I have this file
Copy code
@Stable
sealed interface ResultState<out T> {
    @Stable
    data class Success<T>(val data: T) : ResultState<T>

    @Immutable
    data class Error(val exception: Throwable) : ResultState<Nothing>

    @Immutable
    data object Loading : ResultState<Nothing>
}

inline fun <I, O> Flow<ResultState<I>>.mapResultState(
    crossinline transform: suspend (I) -> O,
): Flow<ResultState<O>> = map { result ->
    return@map when (result) {
        ResultState.Loading -> ResultState.Loading
        is ResultState.Error -> ResultState.Error(result.exception)
        is ResultState.Success -> ResultState.Success(transform(result.data))
    }
}

inline fun <I, O> Flow<ResultState<I>>.flatMapResultState(
    crossinline transform: suspend (I) -> Flow<ResultState<O>>,
): Flow<ResultState<O>> = flow {
    collect { result ->
        when (result) {
            ResultState.Loading -> Unit
            is ResultState.Error -> emit(ResultState.Error(result.exception))
            is ResultState.Success -> transform(result.data)
        }
    }
}
My api calls return
ResultState
flatMapResultState
is for cases when the
ResultState
of first api call should start another api call.
The rough usage example is
Copy code
fun myChainedCall(): Flow<ResultState<List<MyModel>>> = firstCall().flatMapResultState { response ->
        val results = response.myData

        secondCall(input = results).mapResultState { data ->
                // My mapping logic is here
            }
        }
    }
I am unsure what I am missing here …
flatMapResultState
does get triggered
But nothing is collected ...
mapResultState
also gets triggered and shows that it indeed has the data that can be transformed
I could be missing something super obvious but I am unable to pinpoint it for now
s
from the quick glance this line doesn't look right to me:
Copy code
is ResultState.Success -> transform(result.data)
You're not doing anything with the result flow, I think you missed
emitAll
there
a
I have made the change. The behaviour remains the same. I did miss out on
emitAll
. But still unable to collect anything.
flatMapResultState
is triggered, I get the result but nothing moves downstream
s
Can you share some bigger sample with
myChainedCall
invoked? Your code seems to work just fine for me. Here's some sample I've tested on:
Copy code
fun firstCall() = flow {
    delay(500)
    emit(ResultState.Loading.also { println("Emitted $it on first call") })

    delay(500)
    emit(ResultState.Success(listOf("one_one", "one_two").also { println("Emitted $it on first call") }))

    delay(500)
    emit(ResultState.Error(IllegalStateException("some error")).also { println("Emitted $it on first call") })

    delay(500)
    emit(ResultState.Loading.also { println("Emitted $it on first call") })

    delay(500)
    emit(ResultState.Success(listOf("two_one")).also { println("Emitted $it on first call") })
}


fun secondCall(inputs: List<String>) = flow {
    inputs.forEach { input ->
        delay(500)
        emit(ResultState.Success(input).also { println("Emitted $it on second call") })
        delay(500)
        emit(ResultState.Success("${input}_with_second_call_mapping").also { println("Emitted $it on second call") })
    }
}

fun myChainedCall(): Flow<ResultState<String>> = firstCall().flatMapResultState { response ->
    val results = response

    secondCall(inputs = results).mapResultState { data -> "mapped_$data" }
}

suspend fun main() {
    myChainedCall().collect { println("Collected $it\n") }
}
a
I have also tested, and it does work fine. Idk what's going wrong
This is the playground code https://pl.kotl.in/dNQSicFkZ
s
What doesn’t work exactly after you’ve added
emitAll
? What are you expecting?
a
I am an idiot. I had to clean and rebuild the app and it works flawlessly now.
😄 1
Thank you @Szymon Jeziorski
s
Nice! However I’d advice to look into built-in
flatMapLatest
operator 🙂
a
@Sergey Dmitriev avoiding @optin experimental
s
Then make sure you don’t
flatMapResultState
to an infinite flow, because
emitAll
will suspend until the passed flow is completed and only then resume to collect next inputs btw I believe it’s totally safe to use
flatMapLatest
, the reason it is still experimental is that there is no agreement on some implementation details for some cases, more details here: https://github.com/Kotlin/kotlinx.coroutines/issues/3168
a
@Sergey Dmitriev Ah! I see. What changes would be more appropriate for
flatMapResultState
then? I don’t completely understand this
emitAll
will suspend until the passed flow is completed and only then resume to collect next inputs.
Do you mean that if there were more values in the upstream, they will have to wait for
emitAll
to complete? In my particular use case
ResultState.Success
and
ResultState.Error
are terminal i.e. they are always the last value emitted and they only emit once.
infinite flow
I don’t understand this but I’m happy to learn more about it and take your design input.
s
> Do you mean that if there were more values in the upstream, they will have to wait for
emitAll
to complete? Exactly, they will wait until the downstream completes > In my particular use case
ResultState.Success
and
ResultState.Error
are terminal i.e. they are always the last value emitted and they only emit once. I of course can’t know your exact usecase, however what you are describing sounds like
flatMapConcat
(also experimental), however to me it feels like you need
flatMapLatest
but that depends on what you want to do if the upstream emits a new value while the previous one is still processing. > I don’t understand this but I’m happy to learn more about it and take your design input. An example of infinite flow could be a stream of user’s
LatLng
It never ends, so if you do something like
Copy code
originalStream.flatMapResultState {
  userLatLngStream.map { /* transform */ }
}
It’s not gonna work, since
userLatLngStream
will never finish
emitAll
✅ 1
a
An example of infinite flow could be a stream of user’s
LatLng
Does this mean that
LatLng
is a hot flow? Also, the change that I need to make is replace
emitAll
to
flatMapLatest
, right?
s
> Does this mean that
LatLng
is a hot flow Nope it doesn’t have to be, it can be cold or hot but still infinite, For example consider this cold infinite flow:
Copy code
flow {
  while(true) {
    delay(1000)
    emit(generateNextRandomValue())
  }
}
> Also, the change that I need to make is replace
emitAll
to
flatMapLatest
, right? You can try something like:
Copy code
inline fun <I, O> Flow<ResultState<I>>.flatMapResultState(
    crossinline transform: suspend (I) -> Flow<ResultState<O>>,
): Flow<ResultState<O>> = flatMapLatest { result ->
    when (result) {
        is ResultState.Loading -> flowOf(ResultState.Loading)
        is ResultState.Error -> flowOf(ResultState.Error(result.exception))
        is ResultState.Success -> transform(result.data)
    }
}
✅ 1