Nick Williams
04/17/2022, 10:51 AMfun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
val callback = object : Callback { // Implementation of some callback interface
override fun onNextValue(value: T) {
// To avoid blocking you can configure channel capacity using
// either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
trySendBlocking(value)
.onFailure { throwable ->
// Downstream has been cancelled or failed, can log here
}
}
override fun onApiError(cause: Throwable) {
cancel(CancellationException("API Error", cause))
}
override fun onCompleted() = channel.close()
}
api.register(callback)
/*
* Suspends until either 'onCompleted'/'onApiError' from the callback is invoked
* or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
* In both cases, callback will be properly unregistered.
*/
awaitClose {
require(isClosedForSend) // <--- THIS FAILS
api.unregister(callback)
}
}
ephemient
04/17/2022, 11:25 AMSuspends the current coroutine until the channel is either closed or cancelled and invokes the given block before resuming the coroutine.in this case you're getting cancelled before closed
Nick Williams
04/17/2022, 11:40 AMSuspends the current coroutine until the channel is either closed or cancelled
ephemient
04/17/2022, 12:12 PMNick Williams
04/17/2022, 12:20 PMephemient
04/17/2022, 12:22 PMNick Williams
04/17/2022, 12:23 PMephemient
04/17/2022, 12:29 PMNick Williams
04/17/2022, 12:36 PM