https://kotlinlang.org logo
#coroutines
Title
# coroutines
e

Erik

03/28/2020, 8:30 PM
Edit: questions 1 and 2 answered in thread, question 3 remains unanswered. --- I'm trying to broadcast values to multiple flow collectors. The goal is to have all collectors receive the same values. In this example, values -1, 0, 1 and 2 are sent over the channel. There are three collectors, so the goal is to have all three collectors receive the four values, for a total of 12 collect events. I tried using a
Channel
and
bro.receiveAsFlow()
, but that results in the fan-out principle where the four values sent are handled once by any of the available collectors, but not each by all collectors. The alternative is to use a
BroadcastChannel
and
bro.asFlow()
instead, but then the collectors receive... nothing! :\ 1. Why don't they receive any values? I.e. what am I doing wrong? 2. How can I make this work? As far as I understand
BroadcastChannel
can be used to send a value to multiple receivers. And the
asFlow()
extension on it should open a new subscription for every collector? Clearly I'm doing something wrong, but I don't see it. Bonus questions I don't know the answer of: 3. Is this possible at all with channels and flows? I found this open issue (https://github.com/Kotlin/kotlinx.coroutines/issues/1261) about a
Flow.share
operator, which obviously doesn't exist yet. Is this what I might be looking for?
d

Dennis

03/28/2020, 8:41 PM
My bad, asFlow should do that.
t

tseisel

03/28/2020, 9:11 PM
If you need to send the same values to all consumers, you effectively need a
BroadcastChannel
. If you use a regular channel, values will be dispatched between consumers, all receiving different values. If you look at the docs closely, you'll notice that `BroadcastChannel`s loose items that are sent before the first consumer subscribes ; that's probably why your collectors did not receving anything :/
I can suggest you to send your elements to a regular
Channel
, then create a
BroadcastChannel
with the
Channel.broadcast()
extension function. The BroacastChannel will no loose elements this way because it will start pulling from the regular channel only after the first subscriber shows up.
e

Erik

03/28/2020, 9:29 PM
Aha! That lead me to something: If I add
runBlocking { delay(1000) }
on line 26 in the code snippet (before sending to the channel), my collectors all receive values 0, 1 and 2.
Not -1, which was sent to the broadcast channel before there were open subscriptions
Thanks for the hint!
That leaves question 3 open
s

streetsofboston

03/28/2020, 11:46 PM
I don't have my computer here right now, but a Flow (eg a Flow that is the ultimate source flow) has a broadCastIn function, which returns a broadcast channel. From there you can do openSubscription and then asFlow.
b

Brendan Weinstein

03/29/2020, 1:05 AM
You can get multicasting by calling
asFlow
on the underlying broadcast channel multiple times.
Copy code
val channel = BroadcastChannel<Int>(BUFFERED)

launch {
  var i = 0
  while(true) {
    channel.send(i++)
    delay(1000)
  }
}

launch {
  channel.asFlow()
    .collect {
      println("receiver a: $it")
    }
}

launch {
  channel.asFlow()
    .collect {
      println("receiver b: $it")
    }
}
You can run the above and see that receiver
a
and receiver
b
receive every value offered to the channel
t

tseisel

03/29/2020, 11:17 AM
@Erik Does this match your needs ? https://pl.kotl.in/92uuHsRbu
e

Erik

03/29/2020, 1:59 PM
How did you make that shortened URL from play.kotlinlang.org? I don't see an option to share or shorten the URL (it can become very long)
d

Dennis

03/29/2020, 2:00 PM
message has been deleted
e

Erik

03/29/2020, 2:01 PM
Whoa, I don't have that button! Must be some tracker blocker that hides it from me 😉
😁 1
@tseisel thanks for you example. It works mostly like I'd want it to. I have a related question: If I change
Copy code
val source: ReceiveChannel<Int> = produce<Int> {
        for (n in -1 .. 2) {
            send(n)
            delay(100)
        }
    }
to
Copy code
val source = Channel<Int>(Channel.BUFFERED)
    
    launch {
        for (n in -1 .. 2) {
            source.send(n)
            delay(100)
        }
    }
then the script never terminates: it runs forever, or it suspends somewhere but I don't know where, when or why. Do you know why?
t

tseisel

03/29/2020, 9:12 PM
produce
implicitly calls
source.close()
when its block is completed. Calling
source.close()
after the for loop in the
launch
block should do the trick!
e

Erik

03/30/2020, 5:53 AM
Thanks, that's it!
3 Views