synhershko
08/25/2019, 9:19 PMreturn@runBlocking tagsChannel.toList()
.
Problem is, I run my code and while I can see things being emitted from one channel to another, the code runs forever without any of the broadcastchannel subscribers receiving any message.
What am I missing here?Dominaezzz
08/25/2019, 10:02 PMsynhershko
08/25/2019, 10:22 PMimport kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.text.BreakIterator
import java.util.*
import kotlin.collections.ArrayList
class Test() {
@ExperimentalCoroutinesApi
fun annotate(text: String): List<String> {
val tagsChannel = Channel<String>()
val shinglesChannel = BroadcastChannel<String>(1000)
val sentencesChannel = Channel<String>()
return runBlocking {
launch {
shinglesChannel.consumeEach {
println("Hebrew: " + it)
tagsChannel.send("Foo")
}
}
launch {
shinglesChannel.consumeEach {t ->
println("English: " + t)
tagsChannel.send("Bar")
}
}
launch {
sentencesChannel.consumeEach { text ->
repeat(10) {
shinglesChannel.send("Foo bar")
}
}
shinglesChannel.close()
}
launch {
val breakIterator = BreakIterator.getSentenceInstance(Locale("he"))
breakIterator.setText(text)
val start = breakIterator.first()
var end = breakIterator.next()
while (end != BreakIterator.DONE) {
val sentence = text.substring(start, end).trim { it <= ' ' }
sentencesChannel.send(sentence)
end = breakIterator.next()
}
sentencesChannel.close()
}.join()
val ret = ArrayList<String>()
launch {
tagsChannel.consumeEach {
println("test")
ret.add(it)
}
tagsChannel.close()
}.join()
return@runBlocking ret
}
}
}
@ExperimentalCoroutinesApi
fun main() {
val annotaor = Test()
val ret = annotaor.annotate("Test one two three")
}
Dominaezzz
08/26/2019, 10:52 AMopenSubscription
on the broadcast channel.synhershko
08/26/2019, 3:06 PMconsumeEach
does it for you , at least that's what the internet says. I do get the messages, but this code hangs and never finishes and I'm not sure why?synhershko
08/26/2019, 3:06 PMDominaezzz
08/26/2019, 3:13 PMDominaezzz
08/26/2019, 3:15 PMlaunch
is waiting for the previous lines of code to execute.Dominaezzz
08/26/2019, 3:15 PMtagsChannel
to be consumed.Dominaezzz
08/26/2019, 3:15 PMlaunch
is supposed to be doing.Dominaezzz
08/26/2019, 3:16 PM.join()
.Dominaezzz
08/26/2019, 3:16 PMlaunch {}.join()
is rather suspicious code).synhershko
08/26/2019, 7:05 PMDominaezzz
08/26/2019, 7:35 PMDominaezzz
08/26/2019, 7:36 PMtagsChannel.consumeEach {
println("test")
ret.add(it)
}
tagsChannel.close()
Dominaezzz
08/26/2019, 7:36 PMconsumeEach
doesn't return until `close`/`cancel`is called.synhershko
08/26/2019, 7:42 PMsentencesChannel.close()
Dominaezzz
08/26/2019, 7:42 PMtagsChannel
.synhershko
08/26/2019, 7:43 PMsynhershko
08/26/2019, 7:43 PMval ret = ArrayList<String>()
launch {
tagsChannel.consumeEach {
println("test")
ret.add(it)
}
}
return@runBlocking ret
synhershko
08/26/2019, 7:43 PMDominaezzz
08/26/2019, 7:43 PMtagsChannel.close()
is not being called.synhershko
08/26/2019, 7:44 PMsynhershko
08/26/2019, 7:44 PMval ret = ArrayList<String>()
launch {
tagsChannel.consumeEach {
println("test")
ret.add(it)
}
tagsChannel.close()
}
return@runBlocking ret
synhershko
08/26/2019, 7:44 PMDominaezzz
08/26/2019, 7:45 PMtagsChannel.consumeEach
waits until tagsChannel.close()
is called. And here you only call tagsChannel.close()
after tagsChannel.consumeEach
.Dominaezzz
08/26/2019, 7:45 PMsynhershko
08/26/2019, 7:47 PMsynhershko
08/26/2019, 7:48 PMsentencesChannel.close()
up stream , doesn't it propagate the close message downstream?Dominaezzz
08/26/2019, 7:49 PMsentencesChannel
.Dominaezzz
08/26/2019, 7:51 PMlaunch
that will call join
on the two `launch`es and call tagsChannel.close()
after.Dominaezzz
08/26/2019, 7:54 PMval hack1 = launch {
shinglesChannel.consumeEach {
println("Hebrew: $it")
tagsChannel.send("Foo")
}
}
val hack2 = launch {
shinglesChannel.consumeEach { t ->
println("English: " + t)
tagsChannel.send("Bar")
}
}
launch {
tagsChannel.consume {
hack1.join()
hack2.join()
}
}
Dominaezzz
08/26/2019, 7:56 PMFlow
or some CoroutineScope
passing, depending on what your actual code is doing.synhershko
08/26/2019, 8:00 PMsynhershko
08/26/2019, 8:01 PMDominaezzz
08/26/2019, 8:03 PMFlow
is probably the answer.synhershko
08/26/2019, 8:04 PMDominaezzz
08/26/2019, 8:05 PMDominaezzz
08/26/2019, 8:06 PMimport kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consume
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.text.BreakIterator
import java.util.*
import kotlin.collections.ArrayList
class Test() {
@ExperimentalCoroutinesApi
fun annotate(text: String): List<String> {
val tagsChannel = Channel<String>()
val shinglesChannel = BroadcastChannel<String>(1000)
val sentencesChannel = Channel<String>()
return runBlocking {
val hack1 = launch {
shinglesChannel.consumeEach {
println("Hebrew: $it")
tagsChannel.send("Foo")
}
}
val hack2 = launch {
shinglesChannel.consumeEach { t ->
println("English: " + t)
tagsChannel.send("Bar")
}
}
launch {
tagsChannel.consume {
hack1.join()
hack2.join()
}
}
launch {
sentencesChannel.consumeEach { text ->
repeat(10) {
shinglesChannel.send("Foo bar")
}
}
shinglesChannel.close()
}
launch {
val breakIterator = BreakIterator.getSentenceInstance(Locale("he"))
breakIterator.setText(text)
val start = breakIterator.first()
var end = breakIterator.next()
while (end != BreakIterator.DONE) {
val sentence = text.substring(start, end).trim { it <= ' ' }
sentencesChannel.send(sentence)
end = breakIterator.next()
}
sentencesChannel.close()
}.join()
val ret = ArrayList<String>()
launch {
tagsChannel.consumeEach {
println("test")
ret.add(it)
}
tagsChannel.close()
}.join()
return@runBlocking ret
}
}
}
@ExperimentalCoroutinesApi
fun main() {
val annotaor = Test()
val ret = annotaor.annotate("Test one two three")
}
synhershko
08/26/2019, 8:07 PMsynhershko
08/26/2019, 8:08 PMsynhershko
08/26/2019, 8:09 PMFlow
is and how it's different from what we just wrote together?Dominaezzz
08/26/2019, 8:12 PMFlow
are basically lazy channels, that handle resource management for you. i.e `close`/`cancel`. They also have nice stream operatorssynhershko
08/26/2019, 8:13 PMDominaezzz
08/26/2019, 8:13 PMDominaezzz
08/26/2019, 8:13 PM