How do i create a "infinite" flow correctly? I'm...
# coroutines
b
How do i create a "infinite" flow correctly? I'm trying to create a flow to "infinitely" consume from a messagem bus. For example:
Copy code
fun receive(interval: Duration) = flow {
    while (true) {
        delay(interval.toMillis())
        val messages = getMessages(interval)
        messages.forEach {
            println("Emitting $it")
            emit(it)
        }
    }
}
I can consume the flow "forever" like this:
Copy code
receive(Duration.ofSeconds(1))
    .collect {
        println(it)
    }
Or limit the number of messages that i consume:
Copy code
val amount = 10
receive(Duration.ofSeconds(1))
    .take(amount)
    .collect {
        println(it)
    }
I tested this and the flow "completes", but i do not understand what happens in the second case, do the
while(true)
loop gets suspended forever? Is this a problem?
a
It will not suspend forever. The take operator throws an
AbortFlowException
(a subclass of
CancellationException
) from your
receive
flow's call to
emit
to perform cleanup.
b
@Adam Powell, isn't AbortFlowException being thrown inside another "unsafeFlow" instance?
oh but it's being thrown "inside" the collect method of the
receive
flow. I think i got it. Thanks!
👍 1