Hi, did anyone combine the values from several `Re...
# coroutines
l
Hi, did anyone combine the values from several `ReceiveChannel`s? What approach did you take?
k
You could create a subclass of channel that takes vararg amounts of channels as a constructor param. For each channel, create a coroutine and
recieve
the element from that channel. Each time you receive,
send
to
this
channel
d
I usually redesign. If I can't be arsed I use
select
.
👍 1
p
Use asFlow and then combineLatest
b
I use to use
select
in a
produce
, but I haven't looked at any of the new
Flow
stuff
l
Oh, I forgot to mention that all channels have different types and that I want to combine them into a data class sent/broadcast to a new channel each time one new value is received
g
@louiscad is asking a question on coroutines channel. Must be a tough one. Sorry for the noise. 😉
😅 1
b
@gaetan Pretty sure he'll ace the implementation though. 😜 Also sorry for noise.
🤔 1
b
again, fairly easy to do with
select
and
produce
l
My current implementation is 47 lines of boilerplate code (not counting imports, no comments/KDoc lines) to combine 4 channels, each of different element types into a data class broadcast in a new
ConflatedBroadcastChannel
. I wish I could have a shorter solution that could scale better, that's why I asked. Also, I have no clue how I can do this with
select
and
produce
.
d
What do you mean by combine? You want to get an item from each one, put all the items in a data class with 4 fields and then return the instance? Why not just call receive on each channel. Or you want to wait for the the first item from any channel and return a transformation/map of the item?
l
For now, that's what I do, I have a call to receive for each channel, I keep the last received value and each time I receive on new value for any of each channel, I update the last value and update the data class for all last values to be broadcast. I wish I found a more generic or less verbose way to do it for just 4 channels. It's basically over 10 lines of code per channel for now, and I use a few extensions
d
How about
DataClass(channel1.receive(), channel2.receive(), channel3.receive(), channel4.receive())
?
l
I also need updates, when only one of the channels produces a new value, I need a new dataclass with the last values previously received from the other channels.
d
Oh I get it now, you need a stream of latest values.
https://pl.kotl.in/AGzcQBHsL . Doesn't handle input channels closing though.
❤️ 1
b
Yea that's basically what I had in my mind, but with
= produce {
👍 1
p
If one or two channel gets cancelled
Copy code
var latestOne: E1 = one.receive()
var latestTwo: E2 = two.receive()

select<Unit> {
        one.onReceive { latestOne = it }
        two.onReceive { latestTwo = it }
}
then these statements will throw
CancellationException
. Is this the behaviour you want?
l
Yes, that shouldn't be a problem since it'll be eaten by
launch
. I should cancel the two other channels though. I edited the snippet to return
ReceiveChannel<T>
BTW.
I also edited to cancel the channels on scope completion.
p
Yes, that shouldn't be a problem since it'll be eaten by
launch
But it will cancel the whole scope if it is not
Supervised
.
Copy code
runBlocking {
            val a = produce {
                while (true) {
                    delay(1_000)
                    send(1)
                }
            }
            val b = produce {
                while (true) {
                    delay(2_000)
                    send(2)
                }
            }

            coroutineScope {
                launch {
                    while (true) {
                        println("Test")
                        delay(1_000)
                    }
                }

                launch {
                    combineLatest(a, b) { a, b -> Pair(a, b) }.consumeEach {
                        println(it)
                    }
                }

                delay(10_000)
                a.cancel()
            }
        }
When
a.cancel()
is called it will throw an
Exception
and cancel the coroutine that prints "Test".
l
This:
Copy code
runBlocking {
    launch {}.cancel()
}
doesn't throw because cancellation is not a crash, so it doesn't propagate to parent scope when the coroutine is concurrent (as with
launch
or
async
). So I don't see a problem here.
d
@louiscad if one channel receives 2 items before the other receives any, does your solution drop the second item for the first channel? Depends on
onReceive
behaviour.
d
If this is the only receiver, it doesn't.
l
@Dico The second value would not be received until the select is reached, and then, it'd be received immediately (unless the other channel has a value that is selected first because of bias), and would trigger a new transform and sending.