Jiri Bruchanov
06/06/2024, 2:07 PMJiri Bruchanov
06/06/2024, 2:07 PMflow {
repeat(1000) { counter ->
delay(500)
emit(counter)
}
}.collect {
println("Time:${System.currentTimeMillis() - start} CollectA:$it")
if (it > 0 && it == 2) {
delay(2000)
}
}
output:
Time:560 CollectA:0
Time:1078 CollectA:1
Time:1578 CollectA:2
Time:4087 CollectA:3
Time:4602 CollectA:4
Time:5102 CollectA:5
as expected, the delay(2000) of index 2 slows it down, but emitting just continues and no "data" lost.
I'd like to have basically 2nd flow, but with the idea of just continue to emit and it's OK to have data lost.
simply adding a buffer operator works as expected...
code:
val start = System.currentTimeMillis()
flow {
repeat(6) { counter ->
delay(500)
emit(counter)
}
}
.buffer(0, onBufferOverflow = BufferOverflow.DROP_OLDEST)
.collect {
println("Time:${System.currentTimeMillis() - start} CollectA:$it")
if (it > 0 && it == 2) {
delay(2000)
}
}
output: (as expected, items 3, 4 are dropped)
Time:528 CollectA:0
Time:1036 CollectA:1
Time:1538 CollectA:2
Time:3543 CollectA:5
Now, I'd like to mix these flows simply via merge,
- 1st flow would be with buffer (to achieve of nice to collect),
- 2nd flow would be without buffer (to achieve of must collect)
The problem is if I simply use merge on flow with buffer, it doesn't work anymore in same way as it was.
Having this code:
merge(
flow {
repeat(6) { counter ->
delay(500)
emit(counter)
}
}
.buffer(0, onBufferOverflow = BufferOverflow.DROP_OLDEST)
)
.collect {
println("Time:${System.currentTimeMillis() - start} CollectA:$it")
if (it > 0 && it == 2) {
delay(2000)
}
}
Output:
Time:545 CollectA:0
Time:1058 CollectA:1
Time:1574 CollectA:2
Time:3580 CollectA:3
Time:3580 CollectA:4
Time:3580 CollectA:5
I was hoping that simply merge shouldn't really change the behaviour.
What I'm missing here ? or what's the correct way to do basically this
merge(
//nice to collect
flowOf { ... }.buffer(0, onBufferOverflow = BufferOverflow.DROP_OLDEST),
//must collect
flowOf { ... },
)
Jiri Bruchanov
06/06/2024, 2:25 PMreline
06/07/2024, 6:08 AMbuffer
doesn't actually collect the flow, so perhaps you could create your own operator to do this (disclaimer, I haven't tested this)
merge(
flow {
upstream
.buffer(0, DROP_OLDEST)
.collect { emit(it) }
},
flow { .. },
).collect { ..}
uli
06/07/2024, 10:06 AMuli
06/07/2024, 10:15 AM.buffer(0)
after the merge fixes the issue:
Applications of flowOn, buffer, and produceIn _after_ this operator are fused with its concurrent merging so that only one properly configured channel is used for execution of merging logic.uli
06/07/2024, 10:25 AMuli
06/07/2024, 10:26 AM