What's the best way to combine a `Flow<State&gt...
# coroutines
n
What's the best way to combine a
Flow<State>
that represents some state and a
Channel<Event>
? Regular combine(...) does not work well, because the event should only be processed once (against the most recent state). I currently achieve this with a pretty complex channelFlow { } with two coroutines and saving stuff in local `var`s, but I wonder if there's a smarter way.
d
You're looking for
withLatestFrom
. There's an open issue on GitHub.
n
Kind of... Looks like when
other
emits, the transform is not invoked https://github.com/Kotlin/kotlinx.coroutines/issues/1498 while I'd like to react on both
But yeah, the code there is similar to what I have, probably this is the way to go... thanks!
d
Oh, are you looking for
zip
then?
n
Do you just want to know the most recent State? If so,
stateIn
can give you a
StateFlow
so you can just grab the value.
n
Not really, sorry if not clear. Let's say I have two flows - one represents state, the other is an event. I make this distinction because event should only ever be processed once. I want to combine them with a
transform: (State, Event?) -> SomethingElse
. Solutions based on combine have two issues: • transform won't be called until the first
Event
• transform will be passed the latest
Event
multiple times, if state emits fast. this is hard to workaround I ended up with this, not sure if best option but seems to work:
Copy code
// consume T2, never send it twice
fun <T1, T2: Any, R> Flow<T1>.combineConsuming(
    flow: Flow<T2>,
    block: suspend (T1, T2?) -> R
) : Flow<R> {
    val flow1 = this
    val flow2 = flow
    return channelFlow {
        val latest1 = MutableSharedFlow<T1>(replay = 1)

        val job2 = flow2.onEach { t2 ->
            val t1 = latest1.first()
            send(block(t1, t2))
        }.launchIn(this)

        flow1.onEach { t1 ->
            send(block(t1, null))
        }.onCompletion {
            job2.cancel()
        }.launchIn(this)
    }
}
d
I'm still confused tbh. When do you want to
transform
to be called?
In terms of Flow<A> and Flow<B>. (Don't mention state and events because they're confusing)
n
Whenever A or B change 🙂 and every
b: B
should be passed to transform exactly once.
d
So if B emits, you want A to repeat but if A emits you B to be null.
n
Yes that's one way to look at it - it's what my snippets does basically
d
I'm guessing you don't want emissions to start until A emits?
I've left out error handling as an exercise to the reader.
Copy code
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
fun <A, B : Any, R> Flow<A>.doThing(
	flow: Flow<B?>,
	transform: suspend (A, B?) -> R
): Flow<R> {
    return flow {
        coroutineScope {
            val aChannel = this@doThing.produceIn(this)
            val bChannel = flow.produceIn(this)

            var lastA: A = aChannel.receive()
            emit(transform(lastA, bChannel.tryReceive().getOrNull()))

            whileSelect {
                aChannel.onReceive { a ->
                    emit(transform(a, null))
                    lastA = a
                    true
                }
                bChannel.onReceive { b ->
                    emit(transform(lastA, b))
                    true
                }
            }
        }
    }
}
n
@natario1 I think what you have is fine if it works. I assume copy/paste error is why what you posted doesn't work at all (
latest1
is never populated,
first()
just suspends). Your approach works just fine as a way to implement your own operator. Sometimes, building your own can be easier to read than a rube goldberg machine using existing operators.
n
Yeah I missed one line 😄 I'll check the differences with the other snippet, overall looks like I'm not missing any magic operator. Thanks a lot guys!