kenkyee
09/06/2022, 5:29 PMephemient
09/06/2022, 7:21 PMval flow = callbackFlow {
subscribe()
awaitClose { unsubscribe() }
}
withTimeout(...) {
flow.collect()
}
will naturally unsubscribe when the flow collector is stoppedkenkyee
09/06/2022, 7:27 PMuli
09/07/2022, 6:32 AMsuspendCancelableCoroutine()
to wrap it into a suspend function, not into a flow.
This suspend function can then be called with withTimeout {}
kenkyee
09/07/2022, 11:10 AMephemient
09/07/2022, 3:28 PMfun <T> Flow<T>.withTimeout(timeout: Duration): Flow<T> = flow {
withTimeout(timeout) inner@{
emitAll(this@withTimeout)
}
}
do you perhaps want something like this?kenkyee
09/07/2022, 4:00 PMephemient
09/07/2022, 4:02 PMkenkyee
09/07/2022, 4:04 PMephemient
09/07/2022, 4:50 PMephemient
09/07/2022, 4:56 PMkenkyee
09/07/2022, 5:11 PMephemient
09/07/2022, 5:17 PMcallbackFlow {
subscribe() // assume this is non-blocking
awaitClose { unsubscribe() }
}
should be cancellable with no issues, including by timeout of collector. can you give any example of what you are doing