Is there an interruptible callbackFlow (analogue t...
# coroutines
k
Is there an interruptible callbackFlow (analogue to runInterruptible)? I.e., to timeout something that should get callbacks periodically that you’re trying to convert into a flow…
e
why do you need one?
Copy code
val flow = callbackFlow {
    subscribe()
    awaitClose { unsubscribe() }
}
withTimeout(...) {
    flow.collect()
}
will naturally unsubscribe when the flow collector is stopped
k
because say you have some old network API you want to wrap….it can take forever to return if there’s a network issue, but you want to report an error to your flow’s subscribers
u
if “old network api” means you call it and it gives you exactly one result (or one exception) you would want to use
suspendCancelableCoroutine()
to wrap it into a suspend function, not into a flow. This suspend function can then be called with
withTimeout {}
k
Multiple results...sorry. Actual code is for a hardware API but thought a network API would be more understandable in this thread...
e
Copy code
fun <T> Flow<T>.withTimeout(timeout: Duration): Flow<T> = flow {
    withTimeout(timeout) inner@{
        emitAll(this@withTimeout)
    }
}
do you perhaps want something like this?
k
nope…callbackFlow has to be terminated…not a flow. Need the callback flow to convert callback APIs to a Flow…
e
if you use this or something equivalent to wrap your callbackFlow does it give you the behavior you want or is it something different?
k
withTimeout doesn’t work w/ callbackFlow….if you wrap resulting callbackFlow, it times out the flow but not the actual callback handling which is what you want to time out. the goal is to return an error state in the flow for any subscribers to see (not have each subscriber handle timeouts)
e
withTimeout does work, it will cancel the flow which triggers awaitClose
unless you have blocking code directly in the callbackFlow block? that is not the intended use and if that's the case, maybe you should be using `channelFlow`+`launch` instead
k
hmm…channelFlow + launch (timeout in the launch) is what we’ve been doing…. 😂 thought callbackFlow w/ a timeout made more sense…there’s no blocking code..it’s a callback API…it’s just that the callback sometimes never calls back…
e
the typical usage
Copy code
callbackFlow {
    subscribe() // assume this is non-blocking
    awaitClose { unsubscribe() }
}
should be cancellable with no issues, including by timeout of collector. can you give any example of what you are doing