eygraber
03/31/2023, 8:42 PMchannelFlow is there a way to guarantee that send won't throw because it was called after the channel was closed?
Pre coroutines 1.7 I used if(!isClosedForSend) send(...) but now that is annotated as DelicateCoroutinesApi and I see why it's not the best approach (although it did long term solve the issue where nearly any place I used send would crash eventually due to the receiver getting cancelled).
Should I just always use trySend?kevin.cianfarini
03/31/2023, 9:06 PMtrySend could break backpressure, right?eygraber
03/31/2023, 9:07 PMeygraber
03/31/2023, 9:16 PMsend and trySend. It has all of the behavior of send except it doesn't throw if you call send after closekevin.cianfarini
03/31/2023, 9:16 PMeygraber
03/31/2023, 9:24 PMofferCatching to get around the fact that offer threw as well. I guess I could go back to that, but it's annoying to have to do that, and be vigilant about not using sendkevin.cianfarini
03/31/2023, 9:41 PMClosedSendChannelException is not a subclass of cancellation exception. Just don’t can’t Throwable or IllegalStateException. https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-closed-send-channel-exception/kevin.cianfarini
03/31/2023, 9:42 PMeygraber
03/31/2023, 9:42 PMkevin.cianfarini
03/31/2023, 9:42 PMkevin.cianfarini
03/31/2023, 9:43 PMA failed channel rethrows the original close cause exception on send attempts.
This exception is a subclass of IllegalStateException, because, conceptually, it is the sender’s responsibility to close the channel and not try to send anything thereafter. Attempts to send to a closed channel indicate a logical error in the sender’s code.
kevin.cianfarini
03/31/2023, 9:45 PMawaitClose to unregister the underlying callback prior to trying to send?kevin.cianfarini
03/31/2023, 9:45 PMeygraber
03/31/2023, 9:48 PMawaitClose in this case because the body of the channelFlow is wrapped in a coroutineScope (I can post a sample soon).
What I'm worried about is the doc for isClosedForSend which mentions that (emphasis mine):
Returns true if this channel was closed by an invocation of close or its receiving side was cancelled. This means that calling send will result in an exception.
kevin.cianfarini
03/31/2023, 9:50 PMcoroutineScope would preclude you from using await closeeygraber
03/31/2023, 9:52 PMawaitClose would help here. I'm calling send from another flow's collect.
Here's the code:
channelFlow {
coroutineScope {
val stateFlow = flow.stateIn(this)
val fastPath = launch {
stateFlow.collect { value ->
send(Loadable.Loaded(value))
}
}
launch {
delay(initialToLoadingMillis)
fastPath.cancel()
if(stateFlow.value is Loadable.Initial<*>) {
send(Loadable.Loading(placeholder))
delay(minLoadingDurationMillis)
}
stateFlow.collect { value ->
send(Loadable.Loaded(value))
}
}
}
}kevin.cianfarini
03/31/2023, 9:55 PMchannelFlow {
val job = launch {
// do the body of coroutineScope here
}
awaitClose { job.cancel() }
}kevin.cianfarini
03/31/2023, 9:55 PMeygraber
03/31/2023, 10:00 PMClosedSendChannelException because the send docs say:
Note that this function does not check for cancellation when it is not suspended. Use yield or CoroutineScope.isActive to periodically check for cancellation in tight loops if needed.So even if
awaitClose cancels the job, send could still follow through with its call. I'm not sure if the channel is considered closed or not at that point, but if it is, then send will throwkevin.cianfarini
03/31/2023, 10:01 PMeygraber
03/31/2023, 10:02 PMcollector was already invoked right before the cancellationkevin.cianfarini
03/31/2023, 10:04 PMClosing a channel after this function has suspended does not cause this suspended send invocation to abort, because closing a channel is conceptually like sending a special “close token” over this channel. All elements sent over the channel are delivered in first-in first-out order. The sent element will be delivered to receivers before the close token.
eygraber
03/31/2023, 10:06 PMsend docs is that #3 won't check for cancellation, and will attempt to sendkevin.cianfarini
03/31/2023, 10:08 PMkevin.cianfarini
03/31/2023, 10:08 PMif(stateFlow.value is Loadable.Initial<*>) {
send(Loadable.Loading(placeholder))
delay(minLoadingDurationMillis)
}kevin.cianfarini
03/31/2023, 10:09 PMstateFlow.collect { value ->
send(Loadable.Loaded(value))
}
You are guaranteed to never run into the scenario you are concerned about (I think)kevin.cianfarini
03/31/2023, 10:10 PMeygraber
03/31/2023, 10:16 PMisSendForClose mention that it shouldn't be relied on because it can return false, and then immediately afterwards return true (I'm assuming this is why it was annotated as being delicate).
In any case, in your sequence when does the cancellation happen? If during #2 then it should be fine. If after #2 but before #3 then it seems to be the same as my sequencekevin.cianfarini
03/31/2023, 10:17 PMeygraber
03/31/2023, 10:18 PMkevin.cianfarini
03/31/2023, 10:19 PMeygraber
03/31/2023, 10:19 PMsend 😅kevin.cianfarini
03/31/2023, 10:20 PMkevin.cianfarini
03/31/2023, 10:21 PMkevin.cianfarini
03/31/2023, 10:22 PMkevin.cianfarini
03/31/2023, 10:23 PMchannelFlow builder were to be cancelled, which would then invoke awaitClose, which would then invoke job.cancel and I think things should just cascade?kevin.cianfarini
03/31/2023, 10:24 PMkevin.cianfarini
03/31/2023, 10:25 PMeygraber
03/31/2023, 10:29 PMeygraber
03/31/2023, 10:29 PMkevin.cianfarini
03/31/2023, 10:31 PMeygraber
03/31/2023, 10:58 PMeygraber
03/31/2023, 10:58 PMeygraber
03/31/2023, 11:00 PMsend happens after the flow was cancelled (and therefore the channel was closed), but my head is spinning, so I'm putting this aside for now.
Thanks for responding, and like you said, hopefully someone from JB sees this and can give some definitive clarity 😄Patrick Steiger
04/01/2023, 4:44 AMsupervisorScope? If it throws , shouldn’t matter, right?Patrick Steiger
04/01/2023, 2:54 PMsuspend fun main(args: Array<String>) {
channelFlow {
supervisorScope {
launch {
while (true) {
delay(10)
println("Attempting to send, isClosedForSend=$isClosedForSend")
send(Unit)
}
}
delay(15)
println("Closing")
close()
}
}.collect {
delay(50) // simulate slow collector, because if collector gets the closed channel token before the other coroutine attempts to send, it cancels, and if it cancels, `send` throws a `CancellationException` instead of `ClosedSendChannelException`. Close->Cancel->Send vs Close->Send
}
}
outputs:
Attempting to send, isClosedForSend=false
Closing
Attempting to send, isClosedForSend=true
Exception in thread "DefaultDispatcher-worker-1" kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
....
Process finished with exit code 0
Exception is printed to console, but process finishes successfully because exception was thrown in a coroutine that is child of a supervisorScope, so a children failure does not fail the scope. With coroutineScope, process fails with exit code 1, because a failure of the sending child fails the scope, which fails collector coroutine, which throws in main
For changing printing behavior, just set a CoroutineExceptionHandler in `supervisorScope`:
suspend fun main(args: Array<String>) {
channelFlow {
withContext(CoroutineExceptionHandler { _, _ -> }) {
supervisorScope {
// ...
}
}
}.collect {
// ...
}
}kevin.cianfarini
04/01/2023, 4:00 PMA failed channel rethrows the original close cause exception on send attempts.
This exception is a subclass of IllegalStateException, because, conceptually, it is the sender’s responsibility to close the channel and not try to send anything thereafter. Attempts to send to a closed channel indicate a logical error in the sender’s code.
Patrick Steiger
04/01/2023, 4:06 PMPatrick Steiger
04/01/2023, 4:08 PMPatrick Steiger
04/01/2023, 8:54 PMThe sequence I’m worried about is:
1. collection yields a new item
2. Something else happens and cancels the channel
3. Channel.send is called
What I get from the@eygraber you don’t need to worry about the channel being cancelled (not closed) as this will throw a normaldocs is that #3 won’t check for cancellation, and will attempt to sendsend
CancellationException on send when it suspends. If it does not suspend, it will put the value in the buffer (even though there’s no collector anymore to collect from the buffer), but it won’t crash. The channel coroutine will stop once buffer is full and send suspends
suspend fun main(args: Array<String>) {
val flow = channelFlow {
while (true) {
Thread.sleep(10) // do not check for cancellation
println("Attempting to send, isClosedForSend=$isClosedForSend isActive=$isActive")
try {
send(Unit)
} catch (e: Throwable) {
println("Caught $e")
throw e
}
println("Sent")
}
}.buffer(2)
coroutineScope {
val collection = launch {
flow.collect {
println("Received $it")
}
}
delay(30)
collection.cancel()
}
}
output:
Attempting to send, isClosedForSend=false isActive=true
Sent
Received kotlin.Unit
Attempting to send, isClosedForSend=false isActive=true
Sent
Received kotlin.Unit
Attempting to send, isClosedForSend=false isActive=false
Sent
Attempting to send, isClosedForSend=false isActive=false
Sent
Attempting to send, isClosedForSend=false isActive=false
Caught kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@b50cd11
Notice that collector is cancelled after receiving two elements, but because sending is done in a tight loop and the channel has a buffer of size 2, it manages to put two more values in the buffer, and only on the third attempt after cancellation, it suspends and throws CancellationException (buffer is full)Patrick Steiger
04/01/2023, 8:55 PMsend. Other than that, no worriesPatrick Steiger
04/01/2023, 8:56 PM_buffer_(Channel.UNLIMITED) makes code above run forever as it does not check for cancellation before sending, probably until OOM errorPatrick Steiger
04/01/2023, 9:03 PMstateFlow.collect {} will check for cancellation at every new element.
A subscriber to a shared flow is always cancellable, and checks for cancellation before each emission.
eygraber
04/02/2023, 4:46 AMcollect but before send, i.e.
stateFlow.collect {
// at this point, the underlying thread pauses and sometime later the caller scope is closed
// eventually the thread wakes up and send is called
send(...)
// Based on the docs, I'd assume that send would throw at this point because it doesn't check for cancellation, doesn't suspend, and attempts to send on a closed channel
}Patrick Steiger
04/02/2023, 2:41 PMCancellationException if it suspends (buffer is full/no buffer)
2. Send won’t suspend and won’t fail, but collect will then throw CancellationException while suspended waiting for a new value in the next iterationPatrick Steiger
04/02/2023, 2:56 PMClosedSendChannelExceptioneygraber
04/02/2023, 3:05 PMReturns true if this channel was closed by an invocation of close or its receiving side was cancelled. This means that calling send will result in an exception.
Patrick Steiger
04/02/2023, 3:25 PMreceive side was cancelled == cancelling the ReceiveChannel , which is not the same as cancelling the channelFlow collector. Cancelling a ReceiveChannel closes the channel with a CancellationException (isClosedForSend == true), but cancelling a channelFlow collector does not close the channel (isClosedForSend == false).
Still, cancelling a ReceiveChannel would make send throw CancellationException
val channel = Channel<Int>(capacity = 0)
launch {
(1..5).forEach {
Thread.sleep(1000)
println("Sending $it ${channel.isClosedForSend}")
try {
channel.send(it)
} catch (e: Throwable) {
println("Caught $e")
throw e
}
}
}
launch {
channel.consumeEach {
println("Received $it")
}
}
delay(1500)
channel.cancel() // ReceiveChannel.cancel()
output:
Sending 1 false
Received 1
Sending 2 true
Caught java.util.concurrent.CancellationException: Channel was cancelledPatrick Steiger
04/02/2023, 3:26 PMsend throws ClosedSendChannelException is when the SendChannel is `close`d.eygraber
04/03/2023, 12:54 AMits receiving side was cancelled and not following the actual link in the docs which goes to ReceiveChannel.cancel and has nothing to do with the collector getting cancelled 🙈