Massimo Carli
11/18/2018, 11:29 PMisClosedForReceive has more chances to be updated and stop the cycle, otherwise, it won't and I get an exception. Basically, there's a race condition on the isClosedForReceive property that doesn't allow me to use it as a condition for stop receiving. while (!kotlinChannel.isClosedForReceive) {
val data = kotlinChannel.receive()
delay(1) // NEEDED IN ORDER TO UPDATE (most of the time) isClosedForReceive
println(data)
}Alexander
11/19/2018, 7:02 AMreceive() into try-catch.Paul Woitaschek
11/19/2018, 8:39 AMjust and end with try-catch 😛Massimo Carli
11/19/2018, 10:53 AMreceive but on the close. This is because the consumer tries to get new data from the channel and so the producer tries to produce it. The channel is closed so you get an exception on the close which is a suspend function.Alexander
11/19/2018, 11:55 AMsend, so you should also wrap your send into something like:
try {
kotlinChannel.send(x)
} catch(ex: ClosedSendChannelException) {
break
}
Instead of explicitely call receive in the consumer you can iterate the channel. In this case when chennel is closed the for loop will finish.Massimo Carli
11/19/2018, 2:32 PMisClosedForReceive didn't work. Using a try/catch for this purposes is an antipattern called Head in the sand and I prefer not to use it 🙂 Anyway the exception is on the close because of the send invocation as I wrote 👆. The close Javadoc also mentions this. I think it's probably a bug.Alexander
11/19/2018, 2:42 PMBasically, there's a race condition on the isClosedForReceive property that doesn't allow me to use it as a condition for stop receivingSo you check
isClosedForReceive then close is called and then you call receiveAlexander
11/19/2018, 2:51 PMClosed[Send|Receive]ChannelException exceptions the error will go away. And I think it's not an antipattern. The documentation says that receive and send can throw such exceptions.Massimo Carli
11/19/2018, 11:04 PMisClosedForReceive became true if the sender closes the channel and if all the items have been consumed. This is not the case. It needs some time. Adding the delay or during debug, everything works.Massimo Carli
11/19/2018, 11:04 PM@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
val channel = Channel<String>()
val items = arrayOf("1", "2", "3", "4", "5")
runBlocking {
GlobalScope.launch {
for (item in items) {
channel.send(item)
// Conditional close
if (item == "3") {
channel.close()
}
}
}
while (!channel.isClosedForReceive) {
val item = channel.receive()
// delay(10)
println(item)
}
}
}Massimo Carli
11/19/2018, 11:05 PMdelay it works most of the time. Commenting delay most of the time you get an exception on the closeMassimo Carli
11/19/2018, 11:09 PMInputStream has no more data. It's true that receive can throw an exception but only if you invoke it while isClosedForReceive is true that in the previous example should never happen because of the while condition.Alexander
11/20/2018, 9:23 AMisClosedForReceive is true. Your code is concurrent so close can be called between isClosedForReceive and receive. Putting a delay you just reduce the chance of a race condition. I'm pretty sure that if you write jcstress test for your case with delay it will find the same problem.Olekss
11/20/2018, 11:13 AMMassimo Carli
11/20/2018, 2:22 PMMassimo Carli
11/20/2018, 2:25 PM