Gus
08/19/2020, 9:52 AMFlow<Thingy>
and, in some cases, that flow/job should be cancelled if no new Thingy
has been emitted in the past X seconds (and I'll need to close an underlying connection when the flow is cancelled). Is there any readily-available utility to achieve this? If not, does anyone have any pointers on how to do this in an idiomatic way?
If I have to implement it myself, I'm thinking of implementing it as two separate functions: getThingy(): Flow<Thingy>
and getThingy(timeoutMillis: Int): Flow<Thingy>
. getThingy()
would return a cancellable flow, and getThingy(timeoutMillis)
would use the former function behind the scenes and add the timeout logic. I'm thinking of using Timer.schedule()
in the timeout logic so I can cancel the timer and start a new one each time a new Thingy
is emitted, but I wonder whether there's a better way to do this.pedro
08/19/2020, 11:30 AMfun main() {
val flow = flow {
emit(1)
delay(250)
emit(2)
delay(250)
emit(3)
delay(250)
emit(4)
delay(1250)
emit(5)
}
runBlocking {
launch {
flow.transformLatest {
emit(it)
delay(1000)
cancel("Timeout")
}.collect{
println(it)
}
}
}
}
Gus
08/19/2020, 1:01 PM