Jiri Malina
04/05/2021, 1:49 PMawaitClose
on CallbackFlow
? (details in thread)awaitClose() can only be invoked from the producer context
which makes sense, but I can’t think of a clean solution.
fun foo() {
return callbackFlow {
abc.method1(object : Callback {
... // non-terminal methods
... // terminal methods
})
try {
kotlinx.coroutines.withTimeout(3000) {
awaitClose()
}
} catch (e: CancellationException) {
// cleanup
}
}
}
Any ideas? Thanks!mateusz.kwiecinski
04/05/2021, 6:04 PMto transform a callback into a flowI’d ask if transforming the callback into regular coroutine wouldn’t be enough in your case? In most cases cancellation would be handled on collector side
withTimeout { foo().collect { } }
, but I assume you precisely want to expose self-timeouting Flow, right?
If so, I’d suggest to add custom timeout()
extension function. If I understood correctly what you want to achieve is simply:
private fun <T> Flow<T>.timeout() = channelFlow {
withTimeout(3000) {
collect { send(it) }
}
}
which would complete successfully if the inner flow completes within 3 seconds, fail with TimeoutException otherwise
Unless I misunderstood your intentions 🤔Jiri Malina
04/05/2021, 7:27 PMbut I assume you precisely want to expose self-timeouting Flow, right?Yes, that’s correct 😉.
If so, I’d* *suggest to add customThis approach works, however has a little downside that I don’t have access to the original scope. The following example is a bit closer to the actual code. Notice theextension function.timeout()
cancelable
part.
fun foo() {
return callbackFlow {
val cancelable=abc.method1(object : Callback {
... // non-terminal methods
... // terminal methods
})
try {
withTimeout(3000) {
awaitClose()
}
} catch (e: CancellationException) {
cancelable.cancel()
}
}
}
-----------------
I tried to create a custom “awaitCloseWithTimeout” and it seems to work as expected. Having said that it’s not exactly a beatiful code, so any suggestions are more than appreciated 😉
fun foo() {
return callbackFlow {
val cancelable=abc.method1(object : Callback {
... // non-terminal methods
... // terminal methods
})
awaitCloseWithTimeout(TIMEOUT) { success ->
if (!success) cancelable.cancel()
}
}
}
private suspend fun ProducerScope<*>.awaitCloseWithTimeout(timeout: Long, block: suspend (Boolean) -> Unit) {
check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can only be invoked from the producer context" }
var success = false
try {
withTimeoutOrNull(timeout) {
suspendCancellableCoroutine<Unit> { cont ->
invokeOnClose {
success = true
cont.resume(Unit)
}
}
}
} finally {
block(success)
}
}
coroutineContext[Job] === this
check, since I’m creating a new coroutine in “withTimeoutOrNull” 😞. I’m not sure what issues it might cause, but I assume the check is there for a good reason.awaitClose {
if (!cancelable.isCompleted) cancelable.cancel()
}
baxter
04/05/2021, 9:18 PM