Hi all - I'm trying to make sense of BroadcastChan...
# coroutines
s
Hi all - I'm trying to make sense of BroadcastChannel and how to use it. Basically I have a flow of channel -> channel -> broadcast channel -> multiple observers / consumers -> channel (aka sink). I want to be able to get all the processing results in that last channel, and when all messages from it are consumed return. Basically the last line in my code is
return@runBlocking tagsChannel.toList()
. Problem is, I run my code and while I can see things being emitted from one channel to another, the code runs forever without any of the broadcastchannel subscribers receiving any message. What am I missing here?
d
Code example?
s
Copy code
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.text.BreakIterator
import java.util.*
import kotlin.collections.ArrayList

class Test() {

    @ExperimentalCoroutinesApi
    fun annotate(text: String): List<String> {
        val tagsChannel = Channel<String>()
        val shinglesChannel = BroadcastChannel<String>(1000)
        val sentencesChannel = Channel<String>()

        return runBlocking {

            launch {
                shinglesChannel.consumeEach {
                    println("Hebrew: " + it)
                    tagsChannel.send("Foo")
                }
            }
            launch {
                shinglesChannel.consumeEach {t ->
                    println("English: " + t)
                    tagsChannel.send("Bar")
                }
            }

            launch {
                sentencesChannel.consumeEach { text ->
                    repeat(10) {
                        shinglesChannel.send("Foo bar")
                    }
                }
                shinglesChannel.close()
            }

            launch {
                val breakIterator = BreakIterator.getSentenceInstance(Locale("he"))
                breakIterator.setText(text)

                val start = breakIterator.first()
                var end = breakIterator.next()
                while (end != BreakIterator.DONE) {
                    val sentence = text.substring(start, end).trim { it <= ' ' }
                    sentencesChannel.send(sentence)
                    end = breakIterator.next()
                }
                sentencesChannel.close()
            }.join()

            val ret = ArrayList<String>()
            launch {
                tagsChannel.consumeEach {
                    println("test")
                    ret.add(it)
                }
                tagsChannel.close()
            }.join()
            return@runBlocking ret
        }
    }
}

@ExperimentalCoroutinesApi
fun main() {
    val annotaor = Test()
    val ret = annotaor.annotate("Test one two three")
}
d
I think you have to call
openSubscription
on the broadcast channel.
s
consumeEach
does it for you , at least that's what the internet says. I do get the messages, but this code hangs and never finishes and I'm not sure why?
I do close the root channel so it should have propagated the close on all the others, no?
d
Oh I see the issue now.
Your final
launch
is waiting for the previous lines of code to execute.
But the previous code is waiting for the
tagsChannel
to be consumed.
Which the final
launch
is supposed to be doing.
i.e The code should be execute fine once you remove the first
.join()
.
(Although, using
launch {}.join()
is rather suspicious code).
s
It still hangs. I actually started adding the `.join()`s after trying to figure out why it hangs. It also hangs when all joins are removed.
d
Found the second issue.
Copy code
tagsChannel.consumeEach {
    println("test")
    ret.add(it)
}
tagsChannel.close()
consumeEach
doesn't return until `close`/`cancel`is called.
s
close was called on the root channel
sentencesChannel.close()
d
Yes but not on
tagsChannel
.
s
ok I removed it
Copy code
val ret = ArrayList<String>()
            launch {
                tagsChannel.consumeEach {
                    println("test")
                    ret.add(it)
                }
            }
            return@runBlocking ret
still hangs
d
No I meant,
tagsChannel.close()
is not being called.
s
I had it before...
Copy code
val ret = ArrayList<String>()
            launch {
                tagsChannel.consumeEach {
                    println("test")
                    ret.add(it)
                }
                tagsChannel.close()
            }
            return@runBlocking ret
still hangs
d
Yes because
tagsChannel.consumeEach
waits until
tagsChannel.close()
is called. And here you only call
tagsChannel.close()
after
tagsChannel.consumeEach
.
You've created a circular dependency.
s
uhm because I don't have a way to know the emittance has stopped
but if I call
sentencesChannel.close()
up stream , doesn't it propagate the close message downstream?
d
Yes but only to coroutines directly listening to
sentencesChannel
.
A quick and dirty workaround is to do another
launch
that will call
join
on the two `launch`es and call
tagsChannel.close()
after.
Copy code
val hack1 = launch {
    shinglesChannel.consumeEach {
        println("Hebrew: $it")
        tagsChannel.send("Foo")
    }
}
val hack2 = launch {
    shinglesChannel.consumeEach { t ->
        println("English: " + t)
        tagsChannel.send("Bar")
    }
}
launch {
    tagsChannel.consume {
        hack1.join()
        hack2.join()
    }
}
The "correct"/nice solution would be to use
Flow
or some
CoroutineScope
passing, depending on what your actual code is doing.
s
I'll try what you suggested
The idea behind the code is mostly arranging what could have been a very messy Java code into a nicely ordered "stream processing" way of thinking. The code you see here is being run on a single method call, so the motivation is not performance by running lengthy code in coroutines, but mostly motivation is to keep code clean and extensible
d
Yeah, then
Flow
is probably the answer.
s
The code you wrote also hangs
d
Oh. Doesn't hang when I run it.
Copy code
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consume
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.text.BreakIterator
import java.util.*
import kotlin.collections.ArrayList

class Test() {

    @ExperimentalCoroutinesApi
    fun annotate(text: String): List<String> {
        val tagsChannel = Channel<String>()
        val shinglesChannel = BroadcastChannel<String>(1000)
        val sentencesChannel = Channel<String>()

        return runBlocking {
            val hack1 = launch {
                shinglesChannel.consumeEach {
                    println("Hebrew: $it")
                    tagsChannel.send("Foo")
                }
            }
            val hack2 = launch {
                shinglesChannel.consumeEach { t ->
                    println("English: " + t)
                    tagsChannel.send("Bar")
                }
            }
            launch {
                tagsChannel.consume {
                    hack1.join()
                    hack2.join()
                }
            }

            launch {
                sentencesChannel.consumeEach { text ->
                    repeat(10) {
                        shinglesChannel.send("Foo bar")
                    }
                }
                shinglesChannel.close()
            }

            launch {
                val breakIterator = BreakIterator.getSentenceInstance(Locale("he"))
                breakIterator.setText(text)

                val start = breakIterator.first()
                var end = breakIterator.next()
                while (end != BreakIterator.DONE) {
                    val sentence = text.substring(start, end).trim { it <= ' ' }
                    sentencesChannel.send(sentence)
                    end = breakIterator.next()
                }
                sentencesChannel.close()
            }.join()

            val ret = ArrayList<String>()
            launch {
                tagsChannel.consumeEach {
                    println("test")
                    ret.add(it)
                }
                tagsChannel.close()
            }.join()
            return@runBlocking ret
        }
    }
}

@ExperimentalCoroutinesApi
fun main() {
    val annotaor = Test()
    val ret = annotaor.annotate("Test one two three")
}
s
Yeah your code works. One sec, looking.
Oh it's because I removed the last launch block... thanks so much for the help!
can you shortly explain what
Flow
is and how it's different from what we just wrote together?
d
Flow
are basically lazy channels, that handle resource management for you. i.e `close`/`cancel`. They also have nice stream operators
s
cool, thanks again!
No problem!