https://kotlinlang.org logo
#coroutines
Title
# coroutines
s

svenjacobs

04/30/2020, 9:49 AM
Hey everybody, I'm implementing the Model-View-Intent pattern using coroutines and
Flow
. In my concept everything is a
Flow
, beginning with UI events. Let's say I have a view interface like this
Copy code
interface MyView {

    val buttonClicks: Flow<Action.ButtonClick>

    val button2Clicks: Flow<Action.Button2Click>

    fun setMessage(message: String)
}
I have actions
Copy code
sealed class Action {

    object ButtonClick : Action()

    object Button2Click : Action()
}
and a state that holds UI state
Copy code
data class State(
    val message: String
)
The idea is that a
Flow<Action
is transformed into a
Flow<State>
based on the current Action and that state changes are then delegated to the view. The - for this example extremly simplified - flow process would look like this
Copy code
val flow =
    merge(
        view.buttonClicks,
        view.button2Clicks
    ).flatMapMerge { action ->
        when (action) {
            is Action.ButtonClick -> flow {
                // Simulate Flow that constantly produces values

                var increment = 0

                while (true) {
                    delay(1000)
                    increment++

                    emit(State(message = "Message $increment"))
                }
            }

            is Action.Button2Click -> flowOf(State(message = "Hello world"))
        }
    }.onEach { state ->
        view.setMessage(state.message)
    }

// Flow is collected when view is ready
GlobalScope.launch {
    flow.collect()
}
However I have a problem with this concept. Let's say
Action.ButtonClick
should subscribe to some API that constantly produces values which update the UI (simulated in the example above). Unfortunately when multiple button clicks occur, multiple (inner) flows are running in parallel, each unnecessarily updating the UI. Do you have an idea how I can solve this problem, somehow stopping the previous (inner) flow? Thanks for your help!
Thought about replacing
flatMapMerge
by
flatMapLatest
. This works in case of multiple
Action.ButtonClick
but a
Action.Button2Click
would also cancel the flow...
I think what I would need is a
flatMapReplace
or something like that 😅
u

uli

05/02/2020, 1:02 PM
This is typically handeled in the UI. You can disable the buttons while the network request is in flight.
w

wasyl

05/02/2020, 1:05 PM
Are view events supposed to emit new state that’s based on some previous state? Or is each view action completely separate, and you only merge them to have a single place with all handling?
s

svenjacobs

05/02/2020, 1:17 PM
Maybe the button click was a bad example. I'm really looking for a solution where I can stop/replace a Flow of a certain Action while other Flows keep running. Let's imagine for instance there's a FetchPostsByQuery action where the Flow constantly returns new posts matching the query. But what if the user changes the query and a new action is fired?
@wasyl View actions are completely separate. I merge them so that I get a single Flow of state where each value represents the current State
👍 1
w

wasyl

05/02/2020, 1:31 PM
I don't know if there's a Flow equivalent, but it seems to me you want something like RxJava's
switchMap
, right?
s

svenjacobs

05/02/2020, 2:37 PM
According to the documentation (it's been a while since I used RxJava) this sounds like what I would need
r

rkeazor

05/02/2020, 4:01 PM
Umm you probably shouldnt use Global scope. Sounds like leak waiting to happen
You need to use a conflated channel for your actions , and than have your Flow of state emit those actions coming from the channel
w

wasyl

05/02/2020, 4:12 PM
Interestingly,
switchMap
is deprecated, and Flow docs say
Flow analogues of 'switchMap' are 'transformLatest', 'flatMapLatest' and 'mapLatest'
. But also seems like a direct replacement is
flatMapLatest
, which says
Copy code
When the original flow emits a new value, the previous flow produced by `transform` block is cancelled
s

svenjacobs

05/02/2020, 4:27 PM
@rkeazor Of course in the production code I don't use GlobalScope, that's just very simplified example code and is not point of my question 😉
@wasyl Unfortunate flatMapLatest does not work for me. See my answer above.
w

wasyl

05/02/2020, 4:32 PM
Huh, that’s right. I think what you want is pretty difficult to do in a reactive world. It probably be much easier to have separate flow and handler for each of the view event and only merge the results. But if you find a way to do it with one flow I’ll be interested 😉
r

rkeazor

05/02/2020, 4:32 PM
Sorry , I saw global scope and I saw red😂 but it sounds like you need a channel so you dont keep creating those flows No? Or am I missing the question agian
a

Arkadii Ivanov

05/03/2020, 7:38 PM
With MVIKotlin you could do something like this 🙂
Copy code
interface MyView : MviView<State, Intent>

interface MyStore : Store<Intent, State, Nothing> {

    sealed class Intent {
        object ButtonClick : Intent()
        object Button2Click : Intent()
    }

    data class State(
        val message: String = ""
    )
}

class MyStoreFactory(private val storeFactory: StoreFactory) {

    fun create(): MyStore =
        object : MyStore, Store<Intent, State, Nothing> by storeFactory.create(
            initialState = State(),
            executorFactory = ::ExecutorImpl,
            reducer = ReducerImpl
        ) {
        }

    private sealed class Result {
        data class Message(val message: String) : Result()
    }

    private class ExecutorImpl : SuspendExecutor<Intent, Nothing, State, Result, Nothing>() {
        private var previousScope: CoroutineScope? = null

        override suspend fun executeIntent(intent: Intent, getState: () -> State) {
            when (intent) {
                is Intent.ButtonClick -> {
                    previousScope?.cancel()

                    coroutineScope {
                        previousScope = this
                        var increment = 0
                        while (true) {
                            delay(1000)
                            increment++
                            dispatch(Result.Message("Message $increment"))
                        }
                    }
                }

                is Intent.Button2Click -> dispatch(Result.Message("Hello world"))
            }
        }
    }

    private object ReducerImpl : Reducer<State, Result> {
        override fun State.reduce(result: Result): State =
            when (result) {
                is Result.Message -> copy(message = result.message)
            }
    }
}
Then just connect inputs and outputs
w

wasyl

05/03/2020, 7:52 PM
Only this depends on keeping
previousScope
, which I think can be achieved also with op’s original snippet, and isn’t very idiomatic reactive approach
a

Arkadii Ivanov

05/03/2020, 7:55 PM
Well, you can try)
s

svenjacobs

05/04/2020, 3:01 PM
So I wrote my own Flow "operator" that solves my problem. However I'm not quite fond of this solution. Especially since I have to pass in a
CoroutineScope
. I'm happy for any suggestions of how to improve this.
Copy code
/**
 * Flattens Flow by collecting each resulting Flow of incoming, transformed value `T`.
 *
 * Previous Flow collections are cancelled for each incoming, unique value `T` where distinction is
 * defined by [distinctBy].
 *
 * Flow collections are cancelled if original Flow terminates or throws exception.
 *
 * For example if transformation of value `A` returns a Flow that constantly produces values,
 * the next value of `A` cancels previous Flow collection and resulting, merged Flow will only
 * emit values of new Flow (along with Flows of other, unique values).
 */
fun <T : Any, R> Flow<T>.flatMapDistinct(
    scope: CoroutineScope,
    distinctBy: (T) -> Any = { it },
    transform: suspend (T) -> Flow<R>
): Flow<R> = channelFlow {
    val jobs = mutableMapOf<Any, Job>()

    try {
        collect { value ->
            val key = distinctBy(value)
            val flow = transform(value)

            jobs[key]?.cancel()
            jobs[key] = scope.launch {
                flow.collect { innerValue ->
                    runCatching {
                        offer(innerValue)
                    }
                }
            }
        }
    } finally {
        jobs.values.forEach { job -> job.cancel() }
    }
}
I slightly modified the function, removing the requirement for passing a
CoroutineScope
Copy code
fun <T : Any, R> Flow<T>.flatMapDistinct(
    context: CoroutineContext = EmptyCoroutineContext,
    distinctBy: (T) -> Any = { it },
    transform: suspend (T) -> Flow<R>
): Flow<R> = channelFlow {
    supervisorScope {
        withContext(context) {
            val jobs = mutableMapOf<Any, Job>()

            try {
                collect { value ->
                    val key = distinctBy(value)
                    val flow = transform(value)

                    jobs[key]?.cancel()
                    jobs[key] = launch {
                        flow.collect { innerValue ->
                            runCatching {
                                offer(innerValue)
                            }
                        }
                    }
                }
            } finally {
                jobs.values.forEach { job -> job.cancel() }
            }
        }
    }
}
👍 1
z

Zach Klippenstein (he/him) [MOD]

05/06/2020, 4:05 PM
Why do you need to pass in a context at all? The channelFlow and the supervisorScope will use the context from which the flow is being collected, so if you drop the context param you can use the flowOn operator to modify that context. If you pass the context in, flowOn will not work on this operator at all, it's not as idiomatic, and you lose some safety (I believe flowOn will verify that the context doesn't include a Job, so you don't break structured concurrency).
s

svenjacobs

05/06/2020, 4:08 PM
@Zach Klippenstein (he/him) [MOD] Good points, I will drop the context as it's not required. Also I'm not sure about the
supervisorScope
but then again a failure of an inner collection should not abort other collections...
Also I replaced
offer
by
send
.
z

Zach Klippenstein (he/him) [MOD]

05/06/2020, 4:16 PM
In similar built-in operators, I believe the failure of any one child will be propagated downstream. Using a supervisor job might make sense for your specific use case, but if this were a general operator I would drop the supervisorScope because it's not idiomatic and makes actually handling errors more tricky (you'd need a flowOn or an ultimate collector context that has a CoroutineExceptionHandler, which most consumers wouldn't expect). Instead, you could use the
catch
operator on the individual flows to convert thrown exceptions into a sentinel value you can emit like normal. The advantage of that approach is that you're encouraged to convert exceptions that aren't really "exceptional" to regular values early on (in which case the type system can ensure consumers handle them), and leave the exception path for any errors that are truly exceptional/unexpected.
s

svenjacobs

05/06/2020, 4:22 PM
Do you mean applying
catch
on the Flow that is returned by
transform
before the Flow is returned, inside the implementation of
transform
or afterwards in my operator? I guess before because this encourages to return Flows that handle exceptions.
z

Zach Klippenstein (he/him) [MOD]

05/06/2020, 4:43 PM
Yea, you'd need to handle in transform as well - same as with other flatmap operators
👍 1
8 Views