https://kotlinlang.org logo
#coroutines
Title
# coroutines
r

rt

06/07/2020, 7:13 AM
Hi. How do I figure out that the downstream is exhausted (completed/whatever the right terminology is) in a flow builder? This is the code I currently have: https://pl.kotl.in/tCjm9ICUp - it will time out because my loop condition is obviously not working. In general, what I'm trying to do is have this
while (...) { delay(1000); emit(buffer); buffer.clear() }
to have timed buffering. I'm aware there are operators for this, just trying to figure things out for myself. Also, any other notes on pitfalls of my implementation would be appreciated.
f

fatih

06/07/2020, 9:49 AM
There is no way to understand if the collector is finished or not in flow builder by design. The thing is Flow is cold and when you emit a value in flow builder, it just sends the value to collector side and it is done. There is no buffer in Flow as you mentioned. (There is buffer operator but it buffers the emission and runs the collector in a separate coroutine) If you want a Flow with a buffer like you mentioned use
StateFlow
. By the way can you give a bit more details about use case and why do you need to clear buffer?
r

rt

06/07/2020, 9:51 AM
@fatih, i'm saying 'buffer' to refer to the buffer in my code. I'm doin batching in time - the code at the link is pretty much self-explanatory, so I'm not exactly wondering how to literally get an event for flow completion but rather how to build a flow that will somehow know that the upstream will not send any more elements
f

fatih

06/07/2020, 9:56 AM
If I understand correctly I think you can achieve this in the collector side. If you cancel the scope that you use to collect the flow, you will not get any elements.
r

rt

06/07/2020, 9:57 AM
What do you mean? I don't think that answers my question...
f

fatih

06/07/2020, 12:12 PM
Sorry for confusion.Let me give an example.
Copy code
val scope = CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>)
    val testFlow = flow {
        repeat(5) {
            emit(it)
        }
    }
    scope.launch {
        testFlow
            .collect {
                // Check your condition about when to cancel 
                // After you call cancel, isActive becomes false
                if (it == 2) cancel()
                
                // If this coroutine is active do your thing
                if (isActive) {
                    println(it)
                }
            }
    }
Here in collect you can check your cancellation condition and do your stuff if the coroutine is only active
r

rt

06/07/2020, 12:14 PM
I'm not looking for conditional cancelation based on element value - I don't want to cancel at all actually. Let me think on this some more and maybe when I have an answer on my own, I'll post an update here.
👍 1
FWIW, this is what I came up with and didn't actually need the
while (isActive)
part. Not sure how efficient this is now, but anyway: https://pl.kotl.in/fJNkH2kWc
What it does is send elements downstream in chunks of up to a given buffer size. If buffer size is exceeded, it's sent immediately, but if we didn't see a fresh element for a given amount of time, we will send whatever we have.
z

Zach Klippenstein (he/him) [MOD]

06/07/2020, 2:05 PM
Don't you also need to cancel the timed send job in the first branch of that if statement? Currently if you send
n
elements and then timeout, you'll send the buffer but also send an empty list after the timeout.
1
r

rt

06/07/2020, 3:49 PM
@Zach Klippenstein (he/him) [MOD] yeah, i also discovered this while running some tests, but now i don't like that i'm basically launching a coroutine on every element. i actually came up with a workaround, maybe will publish after polishing
here's an attempt at doing this as i originally intended, with a single additional coroutine (at a cost of a couple more allocations) https://gist.github.com/tkroman/96bda51583893aa8135a4bf890348650 the idea is to first pass the flow through another flow that will emit a dummy marker value at the end of stream so that i'll know when to stop the coroutine at the next stage it's not pretty and it's hard for me to reason which one would be more performant but this version is so dirty i almost like it better 🙂
z

Zach Klippenstein (he/him) [MOD]

06/07/2020, 5:07 PM
I personally prefer the first version. It's much simpler and easier to read and reason about. There's nothing inherently wrong with launching a new coroutine on each element, especially when it makes the code so readable. I'm pretty sure there are some operators in the coroutines library that use this technique too. Launching a coroutine effectively amounts to just making a few allocations anyway. It's also pointless to speculate about performance without benchmarking.
2
r

rt

06/07/2020, 6:57 PM
mm... pointless is a strong word though. you realistically can at least ballpark performance implications. if the flow is data-intensive (say you capture mouse movements at a high rate), it doesn't matter how cheap allocations are, they are still there, and in my book
less allocations
>
more allocations
. i'm still on the fence wrt intermediate stream anyway, but think i can rely on fusion here. probably should benchmark anyway
but i was also not precisely correct abt 'new coroutine on every element', it's actually 'on every flush' so under realistic conditions not too often... benchmarks it is
z

Zach Klippenstein (he/him) [MOD]

06/07/2020, 7:11 PM
Sorry, maybe “pointless” was too harsh. What I meant was that I’m not convinced your assumption about allocation comparisons is correct, since starting new coroutines is also pretty cheap and basically amounts to “just a few allocations” itself. And optimizing complicated code is harder than optimizing simple code, so until you’ve actually found real performance issues, it’s better to prefer simpler code.
1
r

rt

06/08/2020, 6:58 AM
yup, totally agree there 🙂
I ran some tests earlier today, nothing fancy, not using jmh etc etc, but simply aggregating after chunking a stream of 1M elements, cleaner version is ~3x slower.
7 Views