What's the most idiomatic way to listen to multipl...
# coroutines
l
What's the most idiomatic way to listen to multiple channels of different types and run a method with all the latest values each time? I mean, let's say I have
channelA: Channel<A>
and
channelB: Channel<B>
, I want to call a method like
update(a: A, b: B)
each time I get a value from any channel, that is, if I receive from
channelA
, I want it to be called with the new
A
value, and the same
B
value as previously received. Each time I have a use case for it, I feel like I'm writing disposable/throwaway code that is a bit error-prone and quite boilerplate-y. If you have any clue about a solution to do it right once for all, in a kind of generic way, please tell! Bonus points if your solution can also work with other callbacks and not just
Channel
g
maybe something like
merge
operator would help? Just merge multiple channels to one
l
Yes, probably, but I'm not aware of anything like this existing for
Channel
(I've searched through autocompletion, and I only found
zip
which does not do what I want as it needs a new value from each channel)
j
ZipOnFirst?
l
I think what I'm looking for is kind of a conflated zip, and preferably without allocation of intermediate objects like
Pair
,
Triple
or other grouping objects, if this is possible
g
Yeah, merge is not available as I know, but you can easily write own implementation, something like:
Copy code
fun <T> merge(vararg channels: ReceiveChannel<T>) = produce {
    for (c in channels) {
        launch(coroutineContext) {
            for (v in c) {
                send(v)
            }
        }
    }
}
(just a draft, not tested, probably can be rewritten with select instead)
l
I don't know how to use select 😅
That looks like what I'm looking for, except it doesn't close the channels (which may cause leaks), but this is fixable. The biggest issue is that it is not conflated, and you lose type informations if you merge channels of different types
Here's a version properly closing the channels:
Copy code
fun <T> merge(vararg channels: ReceiveChannel<T>) = produce {
    try {
        channels.map { channel ->
            launch(coroutineContext) {
                for (e in channel) send(e)
            }
        }.joinAll()
    } finally {
        channels.forEach { it.cancel() }
    }
}
g
Yeah correct, or just use consumeEach
you lose type informations if you merge channels of different types
You can use wrapper over your data or just Any
is not conflated
You can use conflated channel under the hood, but it cannot be just an operator anymore, but probably class implementation detail
l
consumesAll
seems less efficient that a simple
forEach
in this case
w
You can use a sealed class where each channel returns a different sub-type
(for the typing issue)
l
I found a solution to my use case, that can conflate multiple channels and callbacks together into one channel. Here's the source code: https://gist.github.com/LouisCAD/d9d1adbd3b4fd36d9dc121658fefd16f If you are interested but don't understand how it works, feel free to ping me, I'll show you a snippet or add an example to the gist that uses the class.
b
i probably would have used select...
l
@bj0 How? Until now I've had a hard time using it
b
i mean it just returns the first item available from either channel
so
select
on channels ->
update
on returned item
l
That's not what my code is designed for. It transforms conflated values to a single one when
conflateAll
is called (on each update from any callback/channel)
b
ah well
select
would take care of the 'get a value from any channel' part
d
l
@Daniel Tam Yes, this is exactly what I'm looking for, but I'm not using RxJava. I have a preference for the solution I implemented because it can work with any callbacks, not just channels
Also,
combineLatest
seems to work for only 2 `Observable`s, while my
ConflatedValues
class works for any number of properties/callbacks/channels