galex
12/13/2023, 3:36 PM.timeout()
is called after each emit, so I made the following function:
/**
* If the first element takes too long to emit, the [onTimeout] callback will be called without disturbing the current flow
*/
fun <T> Flow<T>.timeoutFirst(duration: Long, scope: CoroutineScope, onTimeout: () -> Unit): Flow<T> = flow {
var emitted = false
scope.launch {
delay(duration)
if (!emitted) {
onTimeout()
}
}
collect {
emitted = true
emit(it)
}
}
First question:
Is there a way to get the coroutineScope where the flow will be collected so that I don't need to pass a scope?
I was using coroutineScope {}
before instead of scope.launch
but then I couldn't get it to work in unit tests under runTest
Second question:
Instead of a callback, could I use the throwing mechanism as timeout
does?
Thanks in advance! 😊franztesca
12/13/2023, 8:43 PMfun <T> Flow<T>.timeoutFirst(duration: Duration) = flow {
coroutineScope {
val values = buffer(Channel.RENDEZVOUS).produceIn(this)
withTimeout(duration) {
emit(values.receiveCatching().onClosed { return@withTimeout }.getOrThrow())
}
emitAll(values.consumeAsFlow())
}
}
aarkling
01/02/2024, 10:40 PMaarkling
01/02/2024, 10:42 PMaarkling
01/02/2024, 10:54 PMclass TimeoutFirstCancellationException : CancellationException()
fun <T> Flow<T>.timeoutFirst(duration: Duration): Flow<T> = flow {
coroutineScope {
var emitted = false
launch {
delay(duration)
if (!emitted) {
throw TimeoutFirstCancellationException()
}
}
collect {
emitted = true
emit(it)
}
}
}
aarkling
01/03/2024, 1:57 AMaarkling
01/03/2024, 1:58 AMprivate fun <T> Flow<T>.timeoutFirst(duration: Duration): Flow<T> = channelFlow {
val emitted = Job()
launch {
collect {
emitted.cancelAndJoin()
send(it)
}
}
try {
withTimeout(duration) { emitted.join() }
} catch (e: TimeoutCancellationException) {
if(emitted.isActive) throw e
}
}
galex
01/03/2024, 4:42 AMfun <T : Any?> Flow<T>.timeoutFirstNotNull(duration: Long, onTimeout: () -> Unit): Flow<T> = flow {
coroutineScope {
var emitted = false
launch {
delay(duration)
if (!emitted) {
onTimeout()
}
}
collect {
if (it != null && !emitted) {
emitted = true
}
emit(it)
}
}
}
galex
01/03/2024, 4:43 AM