svenjacobs
04/30/2020, 9:49 AMFlow
. In my concept everything is a Flow
, beginning with UI events. Let's say I have a view interface like this
interface MyView {
val buttonClicks: Flow<Action.ButtonClick>
val button2Clicks: Flow<Action.Button2Click>
fun setMessage(message: String)
}
I have actions
sealed class Action {
object ButtonClick : Action()
object Button2Click : Action()
}
and a state that holds UI state
data class State(
val message: String
)
The idea is that a Flow<Action
is transformed into a Flow<State>
based on the current Action and that state changes are then delegated to the view.
The - for this example extremly simplified - flow process would look like this
val flow =
merge(
view.buttonClicks,
view.button2Clicks
).flatMapMerge { action ->
when (action) {
is Action.ButtonClick -> flow {
// Simulate Flow that constantly produces values
var increment = 0
while (true) {
delay(1000)
increment++
emit(State(message = "Message $increment"))
}
}
is Action.Button2Click -> flowOf(State(message = "Hello world"))
}
}.onEach { state ->
view.setMessage(state.message)
}
// Flow is collected when view is ready
GlobalScope.launch {
flow.collect()
}
However I have a problem with this concept. Let's say Action.ButtonClick
should subscribe to some API that constantly produces values which update the UI (simulated in the example above). Unfortunately when multiple button clicks occur, multiple (inner) flows are running in parallel, each unnecessarily updating the UI. Do you have an idea how I can solve this problem, somehow stopping the previous (inner) flow? Thanks for your help!flatMapMerge
by flatMapLatest
. This works in case of multiple Action.ButtonClick
but a Action.Button2Click
would also cancel the flow...flatMapReplace
or something like that 😅uli
05/02/2020, 1:02 PMwasyl
05/02/2020, 1:05 PMsvenjacobs
05/02/2020, 1:17 PMwasyl
05/02/2020, 1:31 PMswitchMap
, right?svenjacobs
05/02/2020, 2:37 PMrkeazor
05/02/2020, 4:01 PMwasyl
05/02/2020, 4:12 PMswitchMap
is deprecated, and Flow docs say Flow analogues of 'switchMap' are 'transformLatest', 'flatMapLatest' and 'mapLatest'
. But also seems like a direct replacement is flatMapLatest
, which says
When the original flow emits a new value, the previous flow produced by `transform` block is cancelled
svenjacobs
05/02/2020, 4:27 PMwasyl
05/02/2020, 4:32 PMrkeazor
05/02/2020, 4:32 PMArkadii Ivanov
05/03/2020, 7:38 PMinterface MyView : MviView<State, Intent>
interface MyStore : Store<Intent, State, Nothing> {
sealed class Intent {
object ButtonClick : Intent()
object Button2Click : Intent()
}
data class State(
val message: String = ""
)
}
class MyStoreFactory(private val storeFactory: StoreFactory) {
fun create(): MyStore =
object : MyStore, Store<Intent, State, Nothing> by storeFactory.create(
initialState = State(),
executorFactory = ::ExecutorImpl,
reducer = ReducerImpl
) {
}
private sealed class Result {
data class Message(val message: String) : Result()
}
private class ExecutorImpl : SuspendExecutor<Intent, Nothing, State, Result, Nothing>() {
private var previousScope: CoroutineScope? = null
override suspend fun executeIntent(intent: Intent, getState: () -> State) {
when (intent) {
is Intent.ButtonClick -> {
previousScope?.cancel()
coroutineScope {
previousScope = this
var increment = 0
while (true) {
delay(1000)
increment++
dispatch(Result.Message("Message $increment"))
}
}
}
is Intent.Button2Click -> dispatch(Result.Message("Hello world"))
}
}
}
private object ReducerImpl : Reducer<State, Result> {
override fun State.reduce(result: Result): State =
when (result) {
is Result.Message -> copy(message = result.message)
}
}
}
wasyl
05/03/2020, 7:52 PMpreviousScope
, which I think can be achieved also with op’s original snippet, and isn’t very idiomatic reactive approachArkadii Ivanov
05/03/2020, 7:55 PMsvenjacobs
05/04/2020, 3:01 PMCoroutineScope
. I'm happy for any suggestions of how to improve this.
/**
* Flattens Flow by collecting each resulting Flow of incoming, transformed value `T`.
*
* Previous Flow collections are cancelled for each incoming, unique value `T` where distinction is
* defined by [distinctBy].
*
* Flow collections are cancelled if original Flow terminates or throws exception.
*
* For example if transformation of value `A` returns a Flow that constantly produces values,
* the next value of `A` cancels previous Flow collection and resulting, merged Flow will only
* emit values of new Flow (along with Flows of other, unique values).
*/
fun <T : Any, R> Flow<T>.flatMapDistinct(
scope: CoroutineScope,
distinctBy: (T) -> Any = { it },
transform: suspend (T) -> Flow<R>
): Flow<R> = channelFlow {
val jobs = mutableMapOf<Any, Job>()
try {
collect { value ->
val key = distinctBy(value)
val flow = transform(value)
jobs[key]?.cancel()
jobs[key] = scope.launch {
flow.collect { innerValue ->
runCatching {
offer(innerValue)
}
}
}
}
} finally {
jobs.values.forEach { job -> job.cancel() }
}
}
CoroutineScope
fun <T : Any, R> Flow<T>.flatMapDistinct(
context: CoroutineContext = EmptyCoroutineContext,
distinctBy: (T) -> Any = { it },
transform: suspend (T) -> Flow<R>
): Flow<R> = channelFlow {
supervisorScope {
withContext(context) {
val jobs = mutableMapOf<Any, Job>()
try {
collect { value ->
val key = distinctBy(value)
val flow = transform(value)
jobs[key]?.cancel()
jobs[key] = launch {
flow.collect { innerValue ->
runCatching {
offer(innerValue)
}
}
}
}
} finally {
jobs.values.forEach { job -> job.cancel() }
}
}
}
}
Zach Klippenstein (he/him) [MOD]
05/06/2020, 4:05 PMsvenjacobs
05/06/2020, 4:08 PMsupervisorScope
but then again a failure of an inner collection should not abort other collections...offer
by send
.Zach Klippenstein (he/him) [MOD]
05/06/2020, 4:16 PMcatch
operator on the individual flows to convert thrown exceptions into a sentinel value you can emit like normal. The advantage of that approach is that you're encouraged to convert exceptions that aren't really "exceptional" to regular values early on (in which case the type system can ensure consumers handle them), and leave the exception path for any errors that are truly exceptional/unexpected.svenjacobs
05/06/2020, 4:22 PMcatch
on the Flow that is returned by transform
before the Flow is returned, inside the implementation of transform
or afterwards in my operator? I guess before because this encourages to return Flows that handle exceptions.Zach Klippenstein (he/him) [MOD]
05/06/2020, 4:43 PM