Erik
03/28/2020, 8:30 PMChannel
and bro.receiveAsFlow()
, but that results in the fan-out principle where the four values sent are handled once by any of the available collectors, but not each by all collectors. The alternative is to use a BroadcastChannel
and bro.asFlow()
instead, but then the collectors receive... nothing! :\
1. Why don't they receive any values? I.e. what am I doing wrong?
2. How can I make this work?
As far as I understand BroadcastChannel
can be used to send a value to multiple receivers. And the asFlow()
extension on it should open a new subscription for every collector? Clearly I'm doing something wrong, but I don't see it.
Bonus questions I don't know the answer of:
3. Is this possible at all with channels and flows? I found this open issue (https://github.com/Kotlin/kotlinx.coroutines/issues/1261) about a Flow.share
operator, which obviously doesn't exist yet. Is this what I might be looking for?Dennis
03/28/2020, 8:41 PMopenSubscription
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/open-subscription.htmlDennis
03/28/2020, 8:42 PMtseisel
03/28/2020, 9:11 PMBroadcastChannel
. If you use a regular channel, values will be dispatched between consumers, all receiving different values.
If you look at the docs closely, you'll notice that `BroadcastChannel`s loose items that are sent before the first consumer subscribes ; that's probably why your collectors did not receving anything :/tseisel
03/28/2020, 9:19 PMChannel
, then create a BroadcastChannel
with the Channel.broadcast()
extension function. The BroacastChannel will no loose elements this way because it will start pulling from the regular channel only after the first subscriber shows up.Erik
03/28/2020, 9:29 PMrunBlocking { delay(1000) }
on line 26 in the code snippet (before sending to the channel), my collectors all receive values 0, 1 and 2.Erik
03/28/2020, 9:29 PMErik
03/28/2020, 9:29 PMErik
03/28/2020, 10:08 PMstreetsofboston
03/28/2020, 11:46 PMBrendan Weinstein
03/29/2020, 1:05 AMasFlow
on the underlying broadcast channel multiple times.
val channel = BroadcastChannel<Int>(BUFFERED)
launch {
var i = 0
while(true) {
channel.send(i++)
delay(1000)
}
}
launch {
channel.asFlow()
.collect {
println("receiver a: $it")
}
}
launch {
channel.asFlow()
.collect {
println("receiver b: $it")
}
}
You can run the above and see that receiver a
and receiver b
receive every value offered to the channeltseisel
03/29/2020, 11:17 AMErik
03/29/2020, 1:59 PMDennis
03/29/2020, 2:00 PMErik
03/29/2020, 2:01 PMErik
03/29/2020, 9:06 PMval source: ReceiveChannel<Int> = produce<Int> {
for (n in -1 .. 2) {
send(n)
delay(100)
}
}
to
val source = Channel<Int>(Channel.BUFFERED)
launch {
for (n in -1 .. 2) {
source.send(n)
delay(100)
}
}
then the script never terminates: it runs forever, or it suspends somewhere but I don't know where, when or why. Do you know why?tseisel
03/29/2020, 9:12 PMproduce
implicitly calls source.close()
when its block is completed. Calling source.close()
after the for loop in the launch
block should do the trick!Erik
03/30/2020, 5:53 AM