https://kotlinlang.org logo
#coroutines
Title
# coroutines
s

synhershko

08/25/2019, 9:19 PM
Hi all - I'm trying to make sense of BroadcastChannel and how to use it. Basically I have a flow of channel -> channel -> broadcast channel -> multiple observers / consumers -> channel (aka sink). I want to be able to get all the processing results in that last channel, and when all messages from it are consumed return. Basically the last line in my code is
return@runBlocking tagsChannel.toList()
. Problem is, I run my code and while I can see things being emitted from one channel to another, the code runs forever without any of the broadcastchannel subscribers receiving any message. What am I missing here?
d

Dominaezzz

08/25/2019, 10:02 PM
Code example?
s

synhershko

08/25/2019, 10:22 PM
Copy code
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.text.BreakIterator
import java.util.*
import kotlin.collections.ArrayList

class Test() {

    @ExperimentalCoroutinesApi
    fun annotate(text: String): List<String> {
        val tagsChannel = Channel<String>()
        val shinglesChannel = BroadcastChannel<String>(1000)
        val sentencesChannel = Channel<String>()

        return runBlocking {

            launch {
                shinglesChannel.consumeEach {
                    println("Hebrew: " + it)
                    tagsChannel.send("Foo")
                }
            }
            launch {
                shinglesChannel.consumeEach {t ->
                    println("English: " + t)
                    tagsChannel.send("Bar")
                }
            }

            launch {
                sentencesChannel.consumeEach { text ->
                    repeat(10) {
                        shinglesChannel.send("Foo bar")
                    }
                }
                shinglesChannel.close()
            }

            launch {
                val breakIterator = BreakIterator.getSentenceInstance(Locale("he"))
                breakIterator.setText(text)

                val start = breakIterator.first()
                var end = breakIterator.next()
                while (end != BreakIterator.DONE) {
                    val sentence = text.substring(start, end).trim { it <= ' ' }
                    sentencesChannel.send(sentence)
                    end = breakIterator.next()
                }
                sentencesChannel.close()
            }.join()

            val ret = ArrayList<String>()
            launch {
                tagsChannel.consumeEach {
                    println("test")
                    ret.add(it)
                }
                tagsChannel.close()
            }.join()
            return@runBlocking ret
        }
    }
}

@ExperimentalCoroutinesApi
fun main() {
    val annotaor = Test()
    val ret = annotaor.annotate("Test one two three")
}
d

Dominaezzz

08/26/2019, 10:52 AM
I think you have to call
openSubscription
on the broadcast channel.
s

synhershko

08/26/2019, 3:06 PM
consumeEach
does it for you , at least that's what the internet says. I do get the messages, but this code hangs and never finishes and I'm not sure why?
I do close the root channel so it should have propagated the close on all the others, no?
d

Dominaezzz

08/26/2019, 3:13 PM
Oh I see the issue now.
Your final
launch
is waiting for the previous lines of code to execute.
But the previous code is waiting for the
tagsChannel
to be consumed.
Which the final
launch
is supposed to be doing.
i.e The code should be execute fine once you remove the first
.join()
.
(Although, using
launch {}.join()
is rather suspicious code).
s

synhershko

08/26/2019, 7:05 PM
It still hangs. I actually started adding the `.join()`s after trying to figure out why it hangs. It also hangs when all joins are removed.
d

Dominaezzz

08/26/2019, 7:35 PM
Found the second issue.
Copy code
tagsChannel.consumeEach {
    println("test")
    ret.add(it)
}
tagsChannel.close()
consumeEach
doesn't return until `close`/`cancel`is called.
s

synhershko

08/26/2019, 7:42 PM
close was called on the root channel
sentencesChannel.close()
d

Dominaezzz

08/26/2019, 7:42 PM
Yes but not on
tagsChannel
.
s

synhershko

08/26/2019, 7:43 PM
ok I removed it
Copy code
val ret = ArrayList<String>()
            launch {
                tagsChannel.consumeEach {
                    println("test")
                    ret.add(it)
                }
            }
            return@runBlocking ret
still hangs
d

Dominaezzz

08/26/2019, 7:43 PM
No I meant,
tagsChannel.close()
is not being called.
s

synhershko

08/26/2019, 7:44 PM
I had it before...
Copy code
val ret = ArrayList<String>()
            launch {
                tagsChannel.consumeEach {
                    println("test")
                    ret.add(it)
                }
                tagsChannel.close()
            }
            return@runBlocking ret
still hangs
d

Dominaezzz

08/26/2019, 7:45 PM
Yes because
tagsChannel.consumeEach
waits until
tagsChannel.close()
is called. And here you only call
tagsChannel.close()
after
tagsChannel.consumeEach
.
You've created a circular dependency.
s

synhershko

08/26/2019, 7:47 PM
uhm because I don't have a way to know the emittance has stopped
but if I call
sentencesChannel.close()
up stream , doesn't it propagate the close message downstream?
d

Dominaezzz

08/26/2019, 7:49 PM
Yes but only to coroutines directly listening to
sentencesChannel
.
A quick and dirty workaround is to do another
launch
that will call
join
on the two `launch`es and call
tagsChannel.close()
after.
Copy code
val hack1 = launch {
    shinglesChannel.consumeEach {
        println("Hebrew: $it")
        tagsChannel.send("Foo")
    }
}
val hack2 = launch {
    shinglesChannel.consumeEach { t ->
        println("English: " + t)
        tagsChannel.send("Bar")
    }
}
launch {
    tagsChannel.consume {
        hack1.join()
        hack2.join()
    }
}
The "correct"/nice solution would be to use
Flow
or some
CoroutineScope
passing, depending on what your actual code is doing.
s

synhershko

08/26/2019, 8:00 PM
I'll try what you suggested
The idea behind the code is mostly arranging what could have been a very messy Java code into a nicely ordered "stream processing" way of thinking. The code you see here is being run on a single method call, so the motivation is not performance by running lengthy code in coroutines, but mostly motivation is to keep code clean and extensible
d

Dominaezzz

08/26/2019, 8:03 PM
Yeah, then
Flow
is probably the answer.
s

synhershko

08/26/2019, 8:04 PM
The code you wrote also hangs
d

Dominaezzz

08/26/2019, 8:05 PM
Oh. Doesn't hang when I run it.
Copy code
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consume
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.text.BreakIterator
import java.util.*
import kotlin.collections.ArrayList

class Test() {

    @ExperimentalCoroutinesApi
    fun annotate(text: String): List<String> {
        val tagsChannel = Channel<String>()
        val shinglesChannel = BroadcastChannel<String>(1000)
        val sentencesChannel = Channel<String>()

        return runBlocking {
            val hack1 = launch {
                shinglesChannel.consumeEach {
                    println("Hebrew: $it")
                    tagsChannel.send("Foo")
                }
            }
            val hack2 = launch {
                shinglesChannel.consumeEach { t ->
                    println("English: " + t)
                    tagsChannel.send("Bar")
                }
            }
            launch {
                tagsChannel.consume {
                    hack1.join()
                    hack2.join()
                }
            }

            launch {
                sentencesChannel.consumeEach { text ->
                    repeat(10) {
                        shinglesChannel.send("Foo bar")
                    }
                }
                shinglesChannel.close()
            }

            launch {
                val breakIterator = BreakIterator.getSentenceInstance(Locale("he"))
                breakIterator.setText(text)

                val start = breakIterator.first()
                var end = breakIterator.next()
                while (end != BreakIterator.DONE) {
                    val sentence = text.substring(start, end).trim { it <= ' ' }
                    sentencesChannel.send(sentence)
                    end = breakIterator.next()
                }
                sentencesChannel.close()
            }.join()

            val ret = ArrayList<String>()
            launch {
                tagsChannel.consumeEach {
                    println("test")
                    ret.add(it)
                }
                tagsChannel.close()
            }.join()
            return@runBlocking ret
        }
    }
}

@ExperimentalCoroutinesApi
fun main() {
    val annotaor = Test()
    val ret = annotaor.annotate("Test one two three")
}
s

synhershko

08/26/2019, 8:07 PM
Yeah your code works. One sec, looking.
Oh it's because I removed the last launch block... thanks so much for the help!
can you shortly explain what
Flow
is and how it's different from what we just wrote together?
d

Dominaezzz

08/26/2019, 8:12 PM
Flow
are basically lazy channels, that handle resource management for you. i.e `close`/`cancel`. They also have nice stream operators
s

synhershko

08/26/2019, 8:13 PM
cool, thanks again!
No problem!
5 Views