https://kotlinlang.org logo
Title
e

eygraber

03/31/2023, 8:42 PM
When using
channelFlow
is there a way to guarantee that
send
won't throw because it was called after the channel was closed? Pre coroutines 1.7 I used
if(!isClosedForSend) send(...)
but now that is annotated as
DelicateCoroutinesApi
and I see why it's not the best approach (although it did long term solve the issue where nearly any place I used
send
would crash eventually due to the receiver getting cancelled). Should I just always use
trySend
?
k

kevin.cianfarini

03/31/2023, 9:06 PM
Depending on your use case always using
trySend
could break backpressure, right?
e

eygraber

03/31/2023, 9:07 PM
Yup, but that sounds better than consistently crashing.
Maybe the better solution is a function in between
send
and
trySend
. It has all of the behavior of
send
except it doesn't throw if you call
send
after
close
k

kevin.cianfarini

03/31/2023, 9:16 PM
You could potentially just try catch, too.
e

eygraber

03/31/2023, 9:24 PM
True but you have to remember to not catch the CancellationExceptions. I used to have a wrapper called
offerCatching
to get around the fact that
offer
threw as well. I guess I could go back to that, but it's annoying to have to do that, and be vigilant about not using
send
k

kevin.cianfarini

03/31/2023, 9:41 PM
ClosedSendChannelException
is not a subclass of cancellation exception. Just don’t can’t
Throwable
or
IllegalStateException
. https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-closed-send-channel-exception/
Oh wait
e

eygraber

03/31/2023, 9:42 PM
Right, that's what I meant
k

kevin.cianfarini

03/31/2023, 9:42 PM
I retract my statement
A failed channel rethrows the original close cause exception on send attempts.
This exception is a subclass of IllegalStateException, because, conceptually, it is the sender’s responsibility to close the channel and not try to send anything thereafter. Attempts to send to a closed channel indicate a logical error in the sender’s code.
Is it possible that you should be using
awaitClose
to unregister the underlying callback prior to trying to send?
I assumed you were already doing that but you might not be
e

eygraber

03/31/2023, 9:48 PM
I'm not using
awaitClose
in this case because the body of the
channelFlow
is wrapped in a
coroutineScope
(I can post a sample soon). What I'm worried about is the doc for
isClosedForSend
which mentions that (emphasis mine):
Returns true if this channel was closed by an invocation of close or its receiving side was cancelled. This means that calling send will result in an exception.
k

kevin.cianfarini

03/31/2023, 9:50 PM
I’m not quite sure how wrapping anything in
coroutineScope
would preclude you from using await close
e

eygraber

03/31/2023, 9:52 PM
I don't see how
awaitClose
would help here. I'm calling
send
from another flow's
collect
. Here's the code:
channelFlow {
    coroutineScope {
      val stateFlow = flow.stateIn(this)

      val fastPath = launch {
        stateFlow.collect { value ->
          send(Loadable.Loaded(value))
        }
      }

      launch {
        delay(initialToLoadingMillis)
        fastPath.cancel()
        if(stateFlow.value is Loadable.Initial<*>) {
          send(Loadable.Loading(placeholder))
          delay(minLoadingDurationMillis)
        }
        stateFlow.collect { value ->
          send(Loadable.Loaded(value))
        }
      }
    }
}
k

kevin.cianfarini

03/31/2023, 9:55 PM
Could you potentially do:
channelFlow {
  val job = launch {
    // do the body of coroutineScope here
  }

  awaitClose { job.cancel() }
}
That would stop the constituent flows from trying to send (I think)
Docs from awaitClose, emphasis mine.
Suspends the current coroutine until the channel is either closed or cancelled and i*nvokes the given block before resuming the coroutine.*
e

eygraber

03/31/2023, 10:00 PM
I think there might still be a window for throwing
ClosedSendChannelException
because the
send
docs say:
Note that this function does not check for cancellation when it is not suspended. Use yield or CoroutineScope.isActive to periodically check for cancellation in tight loops if needed.
So even if
awaitClose
cancels the job,
send
could still follow through with its call. I'm not sure if the channel is considered closed or not at that point, but if it is, then
send
will throw
k

kevin.cianfarini

03/31/2023, 10:01 PM
If awaitClose cancels the job then the flow collecting should cancel and then send would never get called (I think)
e

eygraber

03/31/2023, 10:02 PM
Unless the
collector
was already invoked right before the cancellation
k

kevin.cianfarini

03/31/2023, 10:04 PM
If the collector was already invoked right before cancellation the only way cancellation could be propagated would be the following (I think): 1. collection yields new item 2. Channel.send suspends 3. Something else happens and cancels the channel Since that send is already theoretically suspended this applies from the docs:
Closing a channel after this function has suspended does not cause this suspended send invocation to abort, because closing a channel is conceptually like sending a special “close token” over this channel. All elements sent over the channel are delivered in first-in first-out order. The sent element will be delivered to receivers before the close token.
e

eygraber

03/31/2023, 10:06 PM
The sequence I'm worried about is: 1. collection yields a new item 2. Something else happens and cancels the channel 3. Channel.send is called What I get from the
send
docs is that #3 won't check for cancellation, and will attempt to send
k

kevin.cianfarini

03/31/2023, 10:08 PM
I don’t believe that scenario is possible from your above code
This is the only questionable bit that might need wrapping with isSendForClose
if(stateFlow.value is Loadable.Initial<*>) {
          send(Loadable.Loading(placeholder))
          delay(minLoadingDurationMillis)
        }
Since there’s no additional suspension point between receiving an item from a flow and sending it to the channel here
stateFlow.collect { value ->
          send(Loadable.Loaded(value))
        }
You are guaranteed to never run into the scenario you are concerned about (I think)
The sequence of events you’re concerned about would happen with the following: 1. flow emits new item 2. SUSPEND (yield, perhaps?) 3. send is called (but channel was closed)
e

eygraber

03/31/2023, 10:16 PM
The docs for
isSendForClose
mention that it shouldn't be relied on because it can return false, and then immediately afterwards return true (I'm assuming this is why it was annotated as being delicate). In any case, in your sequence when does the cancellation happen? If during #2 then it should be fine. If after #2 but before #3 then it seems to be the same as my sequence
k

kevin.cianfarini

03/31/2023, 10:17 PM
Send will only check for cancellation if it’s going to suspend. On unlimited channels this will never suspend. After #2 and before #3 isn’t a thing — cancellation is explicitly checked by lots of suspending functions. It doesn’t “just happen”. Since there are no other suspend funs between 2 and 3 that’s not a possibility
e

eygraber

03/31/2023, 10:18 PM
Can't the underlying worker thread itself pause, and another thread cause the cancellation?
k

kevin.cianfarini

03/31/2023, 10:19 PM
Hm, yeah. I’m not sure if that would be an issue.
e

eygraber

03/31/2023, 10:19 PM
Hence my existential fear of
send
😅
k

kevin.cianfarini

03/31/2023, 10:20 PM
I think the thing you should truly be concerned about is the thread gets parked after emission but prior to send, and another thread closes the send channel.
I don’t know what thread synchronization come into play here for mutating that closed state
Wait
The channel here would only be closed for send if the flow returned from the
channelFlow
builder were to be cancelled, which would then invoke
awaitClose
, which would then invoke
job.cancel
and I think things should just cascade?
Since this is a higher level API and you’re not interfacing directly with the channel I don’t think you have as much to worry about when managing a closed channel
I’d be interested to get someone from Jetbrains in here to school both of us
e

eygraber

03/31/2023, 10:29 PM
The channel could also be closed if its receiving side was cancelled.
But yeah I lost a lot of context on how this all works over the years. I just got used to relying on how well it works
k

kevin.cianfarini

03/31/2023, 10:31 PM
Which in this case will only ever happen when the flow collection is cancelled
e

eygraber

03/31/2023, 10:58 PM
I think I might have been confusing who the receiving side was here. I thought it was the collector of my flow, but the receiver is actually internal to my flow
i.e. the channel flow itself
I still feel like there's a scenario where the call to
send
happens after the
flow
was cancelled (and therefore the channel was closed), but my head is spinning, so I'm putting this aside for now. Thanks for responding, and like you said, hopefully someone from JB sees this and can give some definitive clarity 😄
p

Patrick Steiger

04/01/2023, 4:44 AM
Why not just `send`ing inside a coroutine `launch`ed inside a
supervisorScope
? If it throws , shouldn’t matter, right?
suspend fun main(args: Array<String>) {
    channelFlow {
        supervisorScope {
            launch {
                while (true) {
                    delay(10)
                    println("Attempting to send, isClosedForSend=$isClosedForSend")
                    send(Unit)
                }
            }
            delay(15)
            println("Closing")
            close()
        }
    }.collect {
        delay(50) // simulate slow collector, because if collector gets the closed channel token before the other coroutine attempts to send, it cancels, and if it cancels, `send` throws a `CancellationException` instead of `ClosedSendChannelException`. Close->Cancel->Send vs Close->Send
    }
}
outputs:
Attempting to send, isClosedForSend=false
Closing
Attempting to send, isClosedForSend=true
Exception in thread "DefaultDispatcher-worker-1" kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
 ....
Process finished with exit code 0
Exception is printed to console, but process finishes successfully because exception was thrown in a coroutine that is child of a
supervisorScope
, so a children failure does not fail the scope. With
coroutineScope
, process fails with exit code 1, because a failure of the sending child fails the scope, which fails collector coroutine, which throws in
main
For changing printing behavior, just set a
CoroutineExceptionHandler
in `supervisorScope`:
suspend fun main(args: Array<String>) {
    channelFlow {
        withContext(CoroutineExceptionHandler { _, _ -> }) {
            supervisorScope {
                // ...
            }
        }
    }.collect {
        // ...
    }
}
k

kevin.cianfarini

04/01/2023, 4:00 PM
It still begs the question of why this is necessary. From the docs for Channel send.
A failed channel rethrows the original close cause exception on send attempts.
This exception is a subclass of IllegalStateException, because, conceptually, it is the sender’s responsibility to close the channel and not try to send anything thereafter. Attempts to send to a closed channel indicate a logical error in the sender’s code.
p

Patrick Steiger

04/01/2023, 4:06 PM
Well, the way I see it, that’s because in the example I posted, sending to the channel is done concurrently with closing the channel, so supervisor scope is a strategy to deal with this. Some other atomic synchronization strategy is possible I guess
In an example where sending to the channel is always done in the same coroutine that closes the channel, it’s easy to not send after closing.
Just reread this whole thread,
The sequence I’m worried about is:
1. collection yields a new item
2. Something else happens and cancels the channel
3. Channel.send is called
What I get from the
send
docs is that #3 won’t check for cancellation, and will attempt to send
@eygraber you don’t need to worry about the channel being cancelled (not closed) as this will throw a normal
CancellationException
on
send
when it suspends. If it does not suspend, it will put the value in the buffer (even though there’s no collector anymore to collect from the buffer), but it won’t crash. The channel coroutine will stop once buffer is full and send suspends
suspend fun main(args: Array<String>) {
    val flow = channelFlow {
        while (true) {
            Thread.sleep(10) // do not check for cancellation
            println("Attempting to send, isClosedForSend=$isClosedForSend isActive=$isActive")
            try {
                send(Unit)
            } catch (e: Throwable) {
                println("Caught $e")
                throw e
            }
            println("Sent")
        }
    }.buffer(2)
    coroutineScope {
        val collection = launch {
            flow.collect {
                println("Received $it")
            }
        }
        delay(30)
        collection.cancel()
    }
}
output:
Attempting to send, isClosedForSend=false isActive=true
Sent
Received kotlin.Unit
Attempting to send, isClosedForSend=false isActive=true
Sent
Received kotlin.Unit
Attempting to send, isClosedForSend=false isActive=false
Sent
Attempting to send, isClosedForSend=false isActive=false
Sent
Attempting to send, isClosedForSend=false isActive=false
Caught kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@b50cd11
Notice that collector is cancelled after receiving two elements, but because sending is done in a tight loop and the channel has a buffer of size 2, it manages to put two more values in the buffer, and only on the third attempt after cancellation, it suspends and throws
CancellationException
(buffer is full)
so if you are in a tight loop and does not want to keep filling the buffer (worrisome on unlimited buffer), one must check for cancellation before
send
. Other than that, no worries
Using
_buffer_(Channel.UNLIMITED)
makes code above run forever as it does not check for cancellation before sending, probably until OOM error
A tight loop is not your case,
stateFlow.collect {}
will check for cancellation at every new element.
A subscriber to a shared flow is always cancellable, and checks for cancellation before each emission.
e

eygraber

04/02/2023, 4:46 AM
I'm surprised that it doesn't throw ClosedSendChannelException in that case. The docs make it seem like it would. In my case I'm supposing that the cancellation happens after
collect
but before
send
, i.e.
stateFlow.collect {
  // at this point, the underlying thread pauses and sometime later the caller scope is closed
  // eventually the thread wakes up and send is called
  send(...)
  // Based on the docs, I'd assume that send would throw at this point because it doesn't check for cancellation, doesn't suspend, and attempts to send on a closed channel
}
p

Patrick Steiger

04/02/2023, 2:41 PM
By caller scope closed, I assume you mean collect job is being cancelled. Then in your example above, either: 1. Send will throw
CancellationException
if it suspends (buffer is full/no buffer) 2. Send won’t suspend and won’t fail, but
collect
will then throw
CancellationException
while suspended waiting for a new value in the next iteration
There’s some confusion between a channel being cancelled and being `close`d. Only a closed channel throws
ClosedSendChannelException
e

eygraber

04/02/2023, 3:05 PM
What I'm worried about is the doc for isClosedForSend which mentions that:
Returns true if this channel was closed by an invocation of close or its receiving side was cancelled. This means that calling send will result in an exception.
p

Patrick Steiger

04/02/2023, 3:25 PM
@eygraber
receive side was cancelled
== cancelling the
ReceiveChannel
, which is not the same as cancelling the
channelFlow
collector. Cancelling a
ReceiveChannel
closes the channel with a
CancellationException
(
isClosedForSend == true
), but cancelling a
channelFlow
collector does not close the channel (
isClosedForSend == false
). Still, cancelling a
ReceiveChannel
would make send throw
CancellationException
val channel = Channel<Int>(capacity = 0)
launch {
    (1..5).forEach {
        Thread.sleep(1000)
        println("Sending $it ${channel.isClosedForSend}")
        try {
            channel.send(it)
        } catch (e: Throwable) {
            println("Caught $e")
            throw e
        }
    }
}
launch {
    channel.consumeEach {
        println("Received $it")
    }
}
delay(1500)
channel.cancel() // ReceiveChannel.cancel()
output:
Sending 1 false
Received 1
Sending 2 true
Caught java.util.concurrent.CancellationException: Channel was cancelled
I believe the only scenario where
send
throws
ClosedSendChannelException
is when the
SendChannel
is `close`d.
e

eygraber

04/03/2023, 12:54 AM
OK I think it makes sense to me now. I was misreading
its receiving side was cancelled
and not following the actual link in the docs which goes to
ReceiveChannel.cancel
and has nothing to do with the collector getting cancelled 🙈