https://kotlinlang.org logo
#coroutines
Title
# coroutines
l

louiscad

08/20/2018, 8:22 AM
What's the most idiomatic way to listen to multiple channels of different types and run a method with all the latest values each time? I mean, let's say I have
channelA: Channel<A>
and
channelB: Channel<B>
, I want to call a method like
update(a: A, b: B)
each time I get a value from any channel, that is, if I receive from
channelA
, I want it to be called with the new
A
value, and the same
B
value as previously received. Each time I have a use case for it, I feel like I'm writing disposable/throwaway code that is a bit error-prone and quite boilerplate-y. If you have any clue about a solution to do it right once for all, in a kind of generic way, please tell! Bonus points if your solution can also work with other callbacks and not just
Channel
g

gildor

08/20/2018, 8:22 AM
maybe something like
merge
operator would help? Just merge multiple channels to one
l

louiscad

08/20/2018, 8:24 AM
Yes, probably, but I'm not aware of anything like this existing for
Channel
(I've searched through autocompletion, and I only found
zip
which does not do what I want as it needs a new value from each channel)
j

julioyg

08/20/2018, 8:25 AM
ZipOnFirst?
l

louiscad

08/20/2018, 8:27 AM
I think what I'm looking for is kind of a conflated zip, and preferably without allocation of intermediate objects like
Pair
,
Triple
or other grouping objects, if this is possible
g

gildor

08/20/2018, 8:29 AM
Yeah, merge is not available as I know, but you can easily write own implementation, something like:
Copy code
fun <T> merge(vararg channels: ReceiveChannel<T>) = produce {
    for (c in channels) {
        launch(coroutineContext) {
            for (v in c) {
                send(v)
            }
        }
    }
}
(just a draft, not tested, probably can be rewritten with select instead)
l

louiscad

08/20/2018, 8:31 AM
I don't know how to use select 😅
That looks like what I'm looking for, except it doesn't close the channels (which may cause leaks), but this is fixable. The biggest issue is that it is not conflated, and you lose type informations if you merge channels of different types
Here's a version properly closing the channels:
Copy code
fun <T> merge(vararg channels: ReceiveChannel<T>) = produce {
    try {
        channels.map { channel ->
            launch(coroutineContext) {
                for (e in channel) send(e)
            }
        }.joinAll()
    } finally {
        channels.forEach { it.cancel() }
    }
}
g

gildor

08/20/2018, 8:39 AM
Yeah correct, or just use consumeEach
you lose type informations if you merge channels of different types
You can use wrapper over your data or just Any
is not conflated
You can use conflated channel under the hood, but it cannot be just an operator anymore, but probably class implementation detail
l

louiscad

08/20/2018, 8:42 AM
consumesAll
seems less efficient that a simple
forEach
in this case
w

withoutclass

08/20/2018, 9:23 AM
You can use a sealed class where each channel returns a different sub-type
(for the typing issue)
l

louiscad

08/20/2018, 2:50 PM
I found a solution to my use case, that can conflate multiple channels and callbacks together into one channel. Here's the source code: https://gist.github.com/LouisCAD/d9d1adbd3b4fd36d9dc121658fefd16f If you are interested but don't understand how it works, feel free to ping me, I'll show you a snippet or add an example to the gist that uses the class.
b

bj0

08/20/2018, 5:26 PM
i probably would have used select...
l

louiscad

08/20/2018, 6:54 PM
@bj0 How? Until now I've had a hard time using it
b

bj0

08/20/2018, 7:04 PM
i mean it just returns the first item available from either channel
so
select
on channels ->
update
on returned item
l

louiscad

08/21/2018, 12:03 AM
That's not what my code is designed for. It transforms conflated values to a single one when
conflateAll
is called (on each update from any callback/channel)
b

bj0

08/21/2018, 12:05 AM
ah well
select
would take care of the 'get a value from any channel' part
d

Daniel Tam

08/21/2018, 6:48 AM
l

louiscad

08/21/2018, 7:28 AM
@Daniel Tam Yes, this is exactly what I'm looking for, but I'm not using RxJava. I have a preference for the solution I implemented because it can work with any callbacks, not just channels
Also,
combineLatest
seems to work for only 2 `Observable`s, while my
ConflatedValues
class works for any number of properties/callbacks/channels