https://kotlinlang.org logo
#coroutines
Title
# coroutines
j

Jiri Malina

04/05/2021, 1:49 PM
Hi there, how to add a timeout to
awaitClose
on
CallbackFlow
? (details in thread)
I’m trying to transform a callback into a flow. I’m using CallbackFlow and it works great. However, I’d like to add a timeout for cases when none of the terminal methods gets invoked on the callback. I tried the following but it fails with
awaitClose() can only be invoked from the producer context
which makes sense, but I can’t think of a clean solution.
Copy code
fun foo() {
        return callbackFlow {
            abc.method1(object : Callback {
                ... // non-terminal methods
                ... // terminal methods
            })
            try {
                kotlinx.coroutines.withTimeout(3000) {
                    awaitClose()
                }
            } catch (e: CancellationException) {
                // cleanup
            }
        }
    }
Any ideas? Thanks!
m

mateusz.kwiecinski

04/05/2021, 6:04 PM
to transform a callback into a flow
I’d ask if transforming the callback into regular coroutine wouldn’t be enough in your case? In most cases cancellation would be handled on collector side
withTimeout { foo().collect { } }
, but I assume you precisely want to expose self-timeouting Flow, right? If so, I’d suggest to add custom
timeout()
extension function. If I understood correctly what you want to achieve is simply:
Copy code
private fun <T> Flow<T>.timeout() = channelFlow {
    withTimeout(3000) {
        collect { send(it) }
    }
}
which would complete successfully if the inner flow completes within 3 seconds, fail with TimeoutException otherwise Unless I misunderstood your intentions 🤔
j

Jiri Malina

04/05/2021, 7:27 PM
Thanks for the feedback @mateusz.kwiecinski!
but I assume you precisely want to expose self-timeouting Flow, right?
Yes, that’s correct 😉.
If so, I’d* *suggest to add custom 
timeout()
 extension function.
This approach works, however has a little downside that I don’t have access to the original scope. The following example is a bit closer to the actual code. Notice the
cancelable
part.
Copy code
fun foo() {
        return callbackFlow {
            val cancelable=abc.method1(object : Callback {
                ... // non-terminal methods
                ... // terminal methods
            })
            try {
                withTimeout(3000) {
                    awaitClose()
                }
            } catch (e: CancellationException) {
                cancelable.cancel()
            }
        }
    }
----------------- I tried to create a custom “awaitCloseWithTimeout” and it seems to work as expected. Having said that it’s not exactly a beatiful code, so any suggestions are more than appreciated 😉
Copy code
fun foo() {
        return callbackFlow {
            val cancelable=abc.method1(object : Callback {
                ... // non-terminal methods
                ... // terminal methods
            })
            
            awaitCloseWithTimeout(TIMEOUT) { success ->
                if (!success) cancelable.cancel()
            }
        }
    }


private suspend fun ProducerScope<*>.awaitCloseWithTimeout(timeout: Long, block: suspend (Boolean) -> Unit) {
        check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can only be invoked from the producer context" }
        var success = false
        try {
            withTimeoutOrNull(timeout) {
                suspendCancellableCoroutine<Unit> { cont ->
                    invokeOnClose {
                        success = true
                        cont.resume(Unit)
                    }
                }
            }
        } finally {
            block(success)
        }
    }
Looking at the code again, it might not work as expected - my manual tests passed, but I’m breaking the
coroutineContext[Job] === this
check, since I’m creating a new coroutine in “withTimeoutOrNull” 😞. I’m not sure what issues it might cause, but I assume the check is there for a good reason.
I should have just waited for another day with a clearer mind 😉. The code you proposed works just fine and I can simply do the cleanup like this
Copy code
awaitClose { 
    if (!cancelable.isCompleted) cancelable.cancel()
}
👌 1
Thank you for your help ! 🙇
b

baxter

04/05/2021, 9:18 PM
I'm working on adding something like this to the coroutines library. If you want to try implementing something custom though, this is what I used externally: https://gist.github.com/pablobaxter/50cb9873f38bf6a0e2d72c769ac112f3
🙏 1
350 Views