https://kotlinlang.org logo
#coroutines
Title
# coroutines
m

Massimo Carli

11/18/2018, 11:29 PM
Question about coroutines. Why this code doesn't work in case of a channel that is closed at the sender side? If I add the commented delay() the
isClosedForReceive
has more chances to be updated and stop the cycle, otherwise, it won't and I get an exception. Basically, there's a race condition on the
isClosedForReceive
property that doesn't allow me to use it as a condition for stop receiving.
Copy code
while (!kotlinChannel.isClosedForReceive) {
      val data = kotlinChannel.receive()
      delay(1) // NEEDED IN ORDER TO UPDATE (most of the time) isClosedForReceive
      println(data)
    }
a

Alexander

11/19/2018, 7:02 AM
Just wrap
receive()
into try-catch.
p

Paul Woitaschek

11/19/2018, 8:39 AM
The best answers start with
just
and end with
try-catch
😛
😂 2
m

Massimo Carli

11/19/2018, 10:53 AM
Anyway, the exception is not in the
receive
but on the
close
. This is because the consumer tries to get new data from the channel and so the producer tries to produce it. The channel is closed so you get an exception on the close which is a suspend function.
a

Alexander

11/19/2018, 11:55 AM
I think exception is on
send
, so you should also wrap your send into something like:
Copy code
try {
    kotlinChannel.send(x)
} catch(ex: ClosedSendChannelException) {
    break
}
Instead of explicitely call receive in the consumer you can iterate the channel. In this case when chennel is closed the for loop will finish.
m

Massimo Carli

11/19/2018, 2:32 PM
Thank you @Alexander. The iteration works fine but I wanted to understand why the
isClosedForReceive
didn't work. Using a try/catch for this purposes is an antipattern called
Head in the sand
and I prefer not to use it 🙂 Anyway the exception is on the
close
because of the send invocation as I wrote 👆. The close Javadoc also mentions this. I think it's probably a bug.
a

Alexander

11/19/2018, 2:42 PM
You've already written an answer:
Basically, there's a race condition on the isClosedForReceive property that doesn't allow me to use it as a condition for stop receiving
So you check
isClosedForReceive
then
close
is called and then you call
receive
I'm pretty sure if you catch corresponding
Closed[Send|Receive]ChannelException
exceptions the error will go away. And I think it's not an antipattern. The documentation says that
receive
and
send
can throw such exceptions.
m

Massimo Carli

11/19/2018, 11:04 PM
The documentation says that
isClosedForReceive
became true if the sender closes the channel and if all the items have been consumed. This is not the case. It needs some time. Adding the
delay
or during debug, everything works.
You can try it here
Copy code
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
  val channel = Channel<String>()

  val items = arrayOf("1", "2", "3", "4", "5")

  runBlocking {
    GlobalScope.launch {
      for (item in items) {
        channel.send(item)
        // Conditional close
        if (item == "3") {
          channel.close()
        }
      }
    }
    while (!channel.isClosedForReceive) {
      val item = channel.receive()
      // delay(10)
      println(item)
    }
  }
}
Uncommenting the
delay
it works most of the time. Commenting
delay
most of the time you get an exception on the
close
Anyway using exception that way is an antipattern. For instance, I don't think you use an exception in order to detect that an
InputStream
has no more data. It's true that
receive
can throw an exception but only if you invoke it while
isClosedForReceive
is true that in the previous example should never happen because of the while condition.
a

Alexander

11/20/2018, 9:23 AM
But you call it exactly when
isClosedForReceive
is true. Your code is concurrent so
close
can be called between
isClosedForReceive
and
receive
. Putting a delay you just reduce the chance of a race condition. I'm pretty sure that if you write jcstress test for your case with delay it will find the same problem.
o

Olekss

11/20/2018, 11:13 AM
then there is not really useful case for "isClosedForReceive", because you can't rely, that if you act on this info, you will get for what you ask. So this check is redundant... Which leaves one with only option, try to receive, and react on the outcome.
👍 1
m

Massimo Carli

11/20/2018, 2:22 PM
Yeah, that's the conclusion. Basically, that property is not reliable because of a thread visibility problem. "...there is not really useful case for... " means it's useless. The only case when it's reliable it's when the one who close the channel is the consumer. The same for producer and isClosedForSend but as @Olekss said it's redundant.
Anyway Thank you all for the interesting discussion 🙂
3 Views