eygraber
03/31/2023, 8:42 PMchannelFlow
is there a way to guarantee that send
won't throw because it was called after the channel was closed?
Pre coroutines 1.7 I used if(!isClosedForSend) send(...)
but now that is annotated as DelicateCoroutinesApi
and I see why it's not the best approach (although it did long term solve the issue where nearly any place I used send
would crash eventually due to the receiver getting cancelled).
Should I just always use trySend
?kevin.cianfarini
03/31/2023, 9:06 PMtrySend
could break backpressure, right?eygraber
03/31/2023, 9:07 PMsend
and trySend
. It has all of the behavior of send
except it doesn't throw if you call send
after close
kevin.cianfarini
03/31/2023, 9:16 PMeygraber
03/31/2023, 9:24 PMofferCatching
to get around the fact that offer
threw as well. I guess I could go back to that, but it's annoying to have to do that, and be vigilant about not using send
kevin.cianfarini
03/31/2023, 9:41 PMClosedSendChannelException
is not a subclass of cancellation exception. Just don’t can’t Throwable
or IllegalStateException
. https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-closed-send-channel-exception/eygraber
03/31/2023, 9:42 PMkevin.cianfarini
03/31/2023, 9:42 PMA failed channel rethrows the original close cause exception on send attempts.
This exception is a subclass of IllegalStateException, because, conceptually, it is the sender’s responsibility to close the channel and not try to send anything thereafter. Attempts to send to a closed channel indicate a logical error in the sender’s code.
awaitClose
to unregister the underlying callback prior to trying to send?eygraber
03/31/2023, 9:48 PMawaitClose
in this case because the body of the channelFlow
is wrapped in a coroutineScope
(I can post a sample soon).
What I'm worried about is the doc for isClosedForSend
which mentions that (emphasis mine):
Returns true if this channel was closed by an invocation of close or its receiving side was cancelled. This means that calling send will result in an exception.
kevin.cianfarini
03/31/2023, 9:50 PMcoroutineScope
would preclude you from using await closeeygraber
03/31/2023, 9:52 PMawaitClose
would help here. I'm calling send
from another flow's collect
.
Here's the code:
channelFlow {
coroutineScope {
val stateFlow = flow.stateIn(this)
val fastPath = launch {
stateFlow.collect { value ->
send(Loadable.Loaded(value))
}
}
launch {
delay(initialToLoadingMillis)
fastPath.cancel()
if(stateFlow.value is Loadable.Initial<*>) {
send(Loadable.Loading(placeholder))
delay(minLoadingDurationMillis)
}
stateFlow.collect { value ->
send(Loadable.Loaded(value))
}
}
}
}
kevin.cianfarini
03/31/2023, 9:55 PMchannelFlow {
val job = launch {
// do the body of coroutineScope here
}
awaitClose { job.cancel() }
}
eygraber
03/31/2023, 10:00 PMClosedSendChannelException
because the send
docs say:
Note that this function does not check for cancellation when it is not suspended. Use yield or CoroutineScope.isActive to periodically check for cancellation in tight loops if needed.So even if
awaitClose
cancels the job, send
could still follow through with its call. I'm not sure if the channel is considered closed or not at that point, but if it is, then send
will throwkevin.cianfarini
03/31/2023, 10:01 PMeygraber
03/31/2023, 10:02 PMcollector
was already invoked right before the cancellationkevin.cianfarini
03/31/2023, 10:04 PMClosing a channel after this function has suspended does not cause this suspended send invocation to abort, because closing a channel is conceptually like sending a special “close token” over this channel. All elements sent over the channel are delivered in first-in first-out order. The sent element will be delivered to receivers before the close token.
eygraber
03/31/2023, 10:06 PMsend
docs is that #3 won't check for cancellation, and will attempt to sendkevin.cianfarini
03/31/2023, 10:08 PMif(stateFlow.value is Loadable.Initial<*>) {
send(Loadable.Loading(placeholder))
delay(minLoadingDurationMillis)
}
stateFlow.collect { value ->
send(Loadable.Loaded(value))
}
You are guaranteed to never run into the scenario you are concerned about (I think)eygraber
03/31/2023, 10:16 PMisSendForClose
mention that it shouldn't be relied on because it can return false, and then immediately afterwards return true (I'm assuming this is why it was annotated as being delicate).
In any case, in your sequence when does the cancellation happen? If during #2 then it should be fine. If after #2 but before #3 then it seems to be the same as my sequencekevin.cianfarini
03/31/2023, 10:17 PMeygraber
03/31/2023, 10:18 PMkevin.cianfarini
03/31/2023, 10:19 PMeygraber
03/31/2023, 10:19 PMsend
😅kevin.cianfarini
03/31/2023, 10:20 PMchannelFlow
builder were to be cancelled, which would then invoke awaitClose
, which would then invoke job.cancel
and I think things should just cascade?eygraber
03/31/2023, 10:29 PMkevin.cianfarini
03/31/2023, 10:31 PMeygraber
03/31/2023, 10:58 PMsend
happens after the flow
was cancelled (and therefore the channel was closed), but my head is spinning, so I'm putting this aside for now.
Thanks for responding, and like you said, hopefully someone from JB sees this and can give some definitive clarity 😄Patrick Steiger
04/01/2023, 4:44 AMsupervisorScope
? If it throws , shouldn’t matter, right?suspend fun main(args: Array<String>) {
channelFlow {
supervisorScope {
launch {
while (true) {
delay(10)
println("Attempting to send, isClosedForSend=$isClosedForSend")
send(Unit)
}
}
delay(15)
println("Closing")
close()
}
}.collect {
delay(50) // simulate slow collector, because if collector gets the closed channel token before the other coroutine attempts to send, it cancels, and if it cancels, `send` throws a `CancellationException` instead of `ClosedSendChannelException`. Close->Cancel->Send vs Close->Send
}
}
outputs:
Attempting to send, isClosedForSend=false
Closing
Attempting to send, isClosedForSend=true
Exception in thread "DefaultDispatcher-worker-1" kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
....
Process finished with exit code 0
Exception is printed to console, but process finishes successfully because exception was thrown in a coroutine that is child of a supervisorScope
, so a children failure does not fail the scope. With coroutineScope
, process fails with exit code 1, because a failure of the sending child fails the scope, which fails collector coroutine, which throws in main
For changing printing behavior, just set a CoroutineExceptionHandler
in `supervisorScope`:
suspend fun main(args: Array<String>) {
channelFlow {
withContext(CoroutineExceptionHandler { _, _ -> }) {
supervisorScope {
// ...
}
}
}.collect {
// ...
}
}
kevin.cianfarini
04/01/2023, 4:00 PMA failed channel rethrows the original close cause exception on send attempts.
This exception is a subclass of IllegalStateException, because, conceptually, it is the sender’s responsibility to close the channel and not try to send anything thereafter. Attempts to send to a closed channel indicate a logical error in the sender’s code.
Patrick Steiger
04/01/2023, 4:06 PMThe sequence I’m worried about is:
1. collection yields a new item
2. Something else happens and cancels the channel
3. Channel.send is called
What I get from the@eygraber you don’t need to worry about the channel being cancelled (not closed) as this will throw a normaldocs is that #3 won’t check for cancellation, and will attempt to sendsend
CancellationException
on send
when it suspends. If it does not suspend, it will put the value in the buffer (even though there’s no collector anymore to collect from the buffer), but it won’t crash. The channel coroutine will stop once buffer is full and send suspends
suspend fun main(args: Array<String>) {
val flow = channelFlow {
while (true) {
Thread.sleep(10) // do not check for cancellation
println("Attempting to send, isClosedForSend=$isClosedForSend isActive=$isActive")
try {
send(Unit)
} catch (e: Throwable) {
println("Caught $e")
throw e
}
println("Sent")
}
}.buffer(2)
coroutineScope {
val collection = launch {
flow.collect {
println("Received $it")
}
}
delay(30)
collection.cancel()
}
}
output:
Attempting to send, isClosedForSend=false isActive=true
Sent
Received kotlin.Unit
Attempting to send, isClosedForSend=false isActive=true
Sent
Received kotlin.Unit
Attempting to send, isClosedForSend=false isActive=false
Sent
Attempting to send, isClosedForSend=false isActive=false
Sent
Attempting to send, isClosedForSend=false isActive=false
Caught kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@b50cd11
Notice that collector is cancelled after receiving two elements, but because sending is done in a tight loop and the channel has a buffer of size 2, it manages to put two more values in the buffer, and only on the third attempt after cancellation, it suspends and throws CancellationException
(buffer is full)send
. Other than that, no worries_buffer_(Channel.UNLIMITED)
makes code above run forever as it does not check for cancellation before sending, probably until OOM errorstateFlow.collect {}
will check for cancellation at every new element.
A subscriber to a shared flow is always cancellable, and checks for cancellation before each emission.
eygraber
04/02/2023, 4:46 AMcollect
but before send
, i.e.
stateFlow.collect {
// at this point, the underlying thread pauses and sometime later the caller scope is closed
// eventually the thread wakes up and send is called
send(...)
// Based on the docs, I'd assume that send would throw at this point because it doesn't check for cancellation, doesn't suspend, and attempts to send on a closed channel
}
Patrick Steiger
04/02/2023, 2:41 PMCancellationException
if it suspends (buffer is full/no buffer)
2. Send won’t suspend and won’t fail, but collect
will then throw CancellationException
while suspended waiting for a new value in the next iterationClosedSendChannelException
eygraber
04/02/2023, 3:05 PMReturns true if this channel was closed by an invocation of close or its receiving side was cancelled. This means that calling send will result in an exception.
Patrick Steiger
04/02/2023, 3:25 PMreceive side was cancelled
== cancelling the ReceiveChannel
, which is not the same as cancelling the channelFlow
collector. Cancelling a ReceiveChannel
closes the channel with a CancellationException
(isClosedForSend == true
), but cancelling a channelFlow
collector does not close the channel (isClosedForSend == false
).
Still, cancelling a ReceiveChannel
would make send throw CancellationException
val channel = Channel<Int>(capacity = 0)
launch {
(1..5).forEach {
Thread.sleep(1000)
println("Sending $it ${channel.isClosedForSend}")
try {
channel.send(it)
} catch (e: Throwable) {
println("Caught $e")
throw e
}
}
}
launch {
channel.consumeEach {
println("Received $it")
}
}
delay(1500)
channel.cancel() // ReceiveChannel.cancel()
output:
Sending 1 false
Received 1
Sending 2 true
Caught java.util.concurrent.CancellationException: Channel was cancelled
send
throws ClosedSendChannelException
is when the SendChannel
is `close`d.eygraber
04/03/2023, 12:54 AMits receiving side was cancelled
and not following the actual link in the docs which goes to ReceiveChannel.cancel
and has nothing to do with the collector getting cancelled 🙈