basically, it's a timeout for flow, it closes afte...
# coroutines
m
basically, it's a timeout for flow, it closes after timedOut
s
Would something like this work?
Copy code
fun <T> Flow<T>.timeout(millis: Long) = flow {
    this@timeout.collect {
        withTimeout(millis) {
            emit(it)
        }
    }
}
m
hm, the timeout would be to the overall emissions, not on each one. i managed to get this solution. what do you think?
Copy code
fun <T> Flow<T>.timeout(durationInMillis: Long): Flow<T> =
    flow {
        coroutineScope {
            val outerScope = this
            launch {
                try {
                    delay(durationInMillis)
                    outerScope.cancel()
                } catch (e: CancellationException) {
                    outerScope.cancel(e) // cancel outer scope on cancellation exception, too
                }
            }
            collect { emit(it) }
        }
    }
oh wait, cannot be like this
yes, it works. gona write some tests for it
d
Copy code
fun <T> Flow<T>.timeout(millis: Long) = flow {
    withTimeout(millis) {
        this@timeout.collect {
            emit(it)
        }
    }
}
m
even better solution. thanks!