Hi, anyone who could help me uderstand little bit ...
# coroutines
j
Hi, anyone who could help me uderstand little bit more flows using buffer operator ? I'm trying to have 2 flows (one must consume, second one nice to consume). More in detail...
basically my idea is following Having this code
Copy code
flow {
    repeat(1000) { counter ->
        delay(500)
        emit(counter)
    }
}.collect {
    println("Time:${System.currentTimeMillis() - start} CollectA:$it")
    if (it > 0 && it == 2) {
        delay(2000)
    }
}
output:
Copy code
Time:560 CollectA:0
Time:1078 CollectA:1
Time:1578 CollectA:2
Time:4087 CollectA:3
Time:4602 CollectA:4
Time:5102 CollectA:5
as expected, the delay(2000) of index 2 slows it down, but emitting just continues and no "data" lost. I'd like to have basically 2nd flow, but with the idea of just continue to emit and it's OK to have data lost. simply adding a buffer operator works as expected... code:
Copy code
val start = System.currentTimeMillis()
flow {
    repeat(6) { counter ->
        delay(500)
        emit(counter)
    }
}
    .buffer(0, onBufferOverflow = BufferOverflow.DROP_OLDEST)
    .collect {
        println("Time:${System.currentTimeMillis() - start} CollectA:$it")
        if (it > 0 && it == 2) {
            delay(2000)
        }
    }
output: (as expected, items 3, 4 are dropped)
Copy code
Time:528 CollectA:0
Time:1036 CollectA:1
Time:1538 CollectA:2
Time:3543 CollectA:5
Now, I'd like to mix these flows simply via merge, - 1st flow would be with buffer (to achieve of nice to collect), - 2nd flow would be without buffer (to achieve of must collect) The problem is if I simply use merge on flow with buffer, it doesn't work anymore in same way as it was. Having this code:
Copy code
merge(
    flow {
        repeat(6) { counter ->
            delay(500)
            emit(counter)
        }
    }
        .buffer(0, onBufferOverflow = BufferOverflow.DROP_OLDEST)
)
.collect {
    println("Time:${System.currentTimeMillis() - start} CollectA:$it")
    if (it > 0 && it == 2) {
        delay(2000)
    }
}
Output:
Copy code
Time:545 CollectA:0
Time:1058 CollectA:1
Time:1574 CollectA:2
Time:3580 CollectA:3
Time:3580 CollectA:4
Time:3580 CollectA:5
I was hoping that simply merge shouldn't really change the behaviour. What I'm missing here ? or what's the correct way to do basically this
Copy code
merge(
   //nice to collect
   flowOf { ... }.buffer(0, onBufferOverflow = BufferOverflow.DROP_OLDEST),
   //must collect
   flowOf { ... },
)
r
Not sure of the intricacies of why this happens but I do know that
buffer
doesn't actually collect the flow, so perhaps you could create your own operator to do this (disclaimer, I haven't tested this)
Copy code
merge(
    flow {
        upstream
            .buffer(0, DROP_OLDEST)
            .collect { emit(it) }
    },
    flow { .. },
).collect { ..}
u
Looks like even the buffer(0) is not really fused into the merge operation.
I would open another issue on that topic.