https://kotlinlang.org logo
#flow
Title
# flow
n

Nick Williams

04/17/2022, 10:51 AM
Any idea why the require in awaitClose would fail? According to docs the channel is closed before the block passed into awaitClose is invoked
Copy code
fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
    val callback = object : Callback { // Implementation of some callback interface
        override fun onNextValue(value: T) {
            // To avoid blocking you can configure channel capacity using
            // either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
            trySendBlocking(value)
                .onFailure { throwable ->
                    // Downstream has been cancelled or failed, can log here
                }
        }
        override fun onApiError(cause: Throwable) {
            cancel(CancellationException("API Error", cause))
        }
        override fun onCompleted() = channel.close()
    }
    api.register(callback)
    /*
     * Suspends until either 'onCompleted'/'onApiError' from the callback is invoked
     * or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
     * In both cases, callback will be properly unregistered.
     */
        awaitClose { 
            require(isClosedForSend) // <--- THIS FAILS
            api.unregister(callback) 
        }
    }
e

ephemient

04/17/2022, 11:25 AM
where in the docs does it say that? https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/await-close.html
Suspends the current coroutine until the channel is either closed or cancelled and invokes the given block before resuming the coroutine.
in this case you're getting cancelled before closed
n

Nick Williams

04/17/2022, 11:40 AM
in which case?
Suspends the current coroutine until the channel is either closed or cancelled
closed or cancelled?
where that states "This function closes the channel and removes all buffered sent elements from it"
so yes in either case according to the docs I would interpret that to mean the channel is closed when that block is executed!
e

ephemient

04/17/2022, 12:12 PM
didn't even think about that, it seems like a documentation bug because it's clearly scope cancellation that invokes the lambda, not channel cancellation
n

Nick Williams

04/17/2022, 12:20 PM
Ok well the code I pasted in was the example from the documentation (apart from the require statement) and you can see if make no attempt to handle closing of the channel if scope is cancelled meaning it would suffer from: _java.lang.IllegalStateException: 'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block. Otherwise, a callback/listener may leak in case of external cancellation._
e

ephemient

04/17/2022, 12:22 PM
the channel does get closed once the scope is canceled, it just hasn't happened yet at that callback. any reason you need the require?
n

Nick Williams

04/17/2022, 12:23 PM
Well I'm getting the ISE above
Which is fixed by closing the channel inside awaitClose which seems like a weird thing to have to do
e

ephemient

04/17/2022, 12:29 PM
can't reproduce here, is it coming from the API you're using?
6 Views