Massimo Carli
11/18/2018, 11:29 PMisClosedForReceive
has more chances to be updated and stop the cycle, otherwise, it won't and I get an exception. Basically, there's a race condition on the isClosedForReceive
property that doesn't allow me to use it as a condition for stop receiving. while (!kotlinChannel.isClosedForReceive) {
val data = kotlinChannel.receive()
delay(1) // NEEDED IN ORDER TO UPDATE (most of the time) isClosedForReceive
println(data)
}
Alexander
11/19/2018, 7:02 AMreceive()
into try-catch.Paul Woitaschek
11/19/2018, 8:39 AMjust
and end with try-catch
😛Massimo Carli
11/19/2018, 10:53 AMreceive
but on the close
. This is because the consumer tries to get new data from the channel and so the producer tries to produce it. The channel is closed so you get an exception on the close which is a suspend function.Alexander
11/19/2018, 11:55 AMsend
, so you should also wrap your send into something like:
try {
kotlinChannel.send(x)
} catch(ex: ClosedSendChannelException) {
break
}
Instead of explicitely call receive in the consumer you can iterate the channel. In this case when chennel is closed the for loop will finish.Massimo Carli
11/19/2018, 2:32 PMisClosedForReceive
didn't work. Using a try/catch for this purposes is an antipattern called Head in the sand
and I prefer not to use it 🙂 Anyway the exception is on the close
because of the send invocation as I wrote 👆. The close Javadoc also mentions this. I think it's probably a bug.Alexander
11/19/2018, 2:42 PMBasically, there's a race condition on the isClosedForReceive property that doesn't allow me to use it as a condition for stop receivingSo you check
isClosedForReceive
then close
is called and then you call receive
Closed[Send|Receive]ChannelException
exceptions the error will go away. And I think it's not an antipattern. The documentation says that receive
and send
can throw such exceptions.Massimo Carli
11/19/2018, 11:04 PMisClosedForReceive
became true if the sender closes the channel and if all the items have been consumed. This is not the case. It needs some time. Adding the delay
or during debug, everything works.@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
val channel = Channel<String>()
val items = arrayOf("1", "2", "3", "4", "5")
runBlocking {
GlobalScope.launch {
for (item in items) {
channel.send(item)
// Conditional close
if (item == "3") {
channel.close()
}
}
}
while (!channel.isClosedForReceive) {
val item = channel.receive()
// delay(10)
println(item)
}
}
}
delay
it works most of the time. Commenting delay
most of the time you get an exception on the close
InputStream
has no more data. It's true that receive
can throw an exception but only if you invoke it while isClosedForReceive
is true that in the previous example should never happen because of the while condition.Alexander
11/20/2018, 9:23 AMisClosedForReceive
is true. Your code is concurrent so close
can be called between isClosedForReceive
and receive
. Putting a delay you just reduce the chance of a race condition. I'm pretty sure that if you write jcstress test for your case with delay it will find the same problem.Olekss
11/20/2018, 11:13 AMMassimo Carli
11/20/2018, 2:22 PM