Hi Everyone, Please help. I was trying to do: ``` ...
# coroutines
a
Hi Everyone, Please help. I was trying to do:
Copy code
2sec     2sec     2sec
------[A]------[B]------[C]------...----------------> InitailFlow
       \        |        | 
        \      drop      drop
         \
     5sec \    5sec        5sec
----------[1]---------[2]---------[3]-----|> AnotherFlow

result: [A1, A2, A3]
So I have
InitailFlow
which emits a short amount of time (2 seconds) which is then transformed to
AnotherFlow
which takes longer to finish (15 seconds in total)... I would like to drop the other incoming items emitted by the
InitialFlow
while
AnotherFlow
isn't finished... I've tried:
Copy code
flow{
    delay(2000)
    emit("A")
    delay(2000)
    emit("B")
    delay(2000)
    emit("C")
}.buffer(0, BufferOverflow.DROP_LATEST)
    .onEach {
       println("Event for $it")
    }
    .flatMapConcat {
       flow {
           delay(5000)
           emit("${it}1")
           delay(5000)
           emit("${it}2")
           delay(5000)
           emit("${it}3")
        }
     }
     .onEach {
         println(it)
     }
     .launchIn(scope)
But for some reason this is the result:
Copy code
Event for A
A1
A2
A3
Event for B
B1
B2
B3
It still process Event B for some reason even when I have a
.buffer(0, BufferOverflow.DROP_LATEST)
. Why does it still process Event B? Is there a way to do this? Thanks in advance.
h
In the
buffer()
documentation for the
onBufferOverflow
parameter it is mentioned this:
Copy code
onBufferOverflow - configures an action on buffer overflow (optional, defaults to SUSPEND, supported only when capacity >= 0 or capacity == Channel.BUFFERED, implicitly creates a channel with at least one buffered element).
So if it implicitly creates a channel with at least one buffered element, that would explain why you get B as well (because it is stored in the buffer while A is processed and C is discarded)
Regarding a solution... I tried to create a
SharedFlow
that will be the upstream but it fails with
Copy code
replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy DROP_LATEST
So because I couldn't seem to find an already implemented operator/method suitable for your use case, I came up with a custom solution. Instead of
buffer(0, BufferOverflow.DROP_LATEST)
use
rendezvous()
, defined as follows:
Copy code
fun <T> Flow<T>.rendezvous(): Flow<T> = flow {
    coroutineScope {
        val channel = Channel<T>()
        launch {
            this@rendezvous.collect { channel.offer(it) }
            channel.close()
        }
        channel.consumeEach { emit(it) }
    }
}
See if it works for your use case @Archie
a
@Halex, your solution works and to simplify it,
Copy code
fun <T> Flow<T>.rendezvous(): Flow<T> = flow {
    coroutineScope {
        val channel = produce(capacity = Channel.RENDEZVOUS) {
            collect { offer(it) }
        }
        channel.consumeEach { emit(it) }
    }
}
I still feel like
.buffer(RENDEVOUS, DROP_LATEST)
should work this way.
h
Good idea with
produce()
@Archie , we could actually drop the
channel
variable altogether now
Copy code
fun <T> Flow<T>.rendezvous(): Flow<T> = flow {
    coroutineScope {
        produce(capacity = Channel.RENDEZVOUS) { collect { offer(it) } }
            .consumeEach { emit(it) }
    }
}
I still feel like 
.buffer(RENDEVOUS, DROP_LATEST)
 should work this way.
I have the same opinion. Maybe it would be helpful for everyone if you described the issue on the issue tracker, so that we either understand the reasons behind the implementation, get a better solution or propose a helpful change in the library.
a
I posted in the github page here: https://github.com/Kotlin/kotlinx.coroutines/issues/2391 hmm... I'll wait for a reply first in that.. if within the day i got no reply.. ill probably file it in YT as well..
m
I actually just found a simpler solution using
channelFlow
. I posted it here: https://stackoverflow.com/a/64857490/4137489
Copy code
fun <T> Flow<T>.dropIfBusy(): Flow<T> = channelFlow {
    collect { offer(it) }
}.buffer(0)
h
@marstran really cool solution 😄 You're using the fact that the call to
buffer()
doesn't create a new channel but modifies the one upstream instead. I'm going to keep that in mind for the future, thank you for the idea 👍
m
Yep 🙂
h
Here's an interesting related article.
k
it seems like you're looking for dropping while busy
The above is something that I worked on for a wile in conjunction with @elizarov but ultimately dropped the issue since the inner workings of flow backpressure was altered when `SharedFlow`/`StateFlow` were being worked on
note that the solution within the stackoverflow post has some edge cases, but the general term you're looking for is dropping elements when backpressure is applied
h
Awesome! Thank you for the useful intervention. Do you know if there is any intention of adding this backpressure drop in the Coroutines library? Or maybe any warning (besides the documentation that I mentioned in the first reply on the thread, which is kind of hidden) that
buffer(RENDEZVOUS, DROP_LATEST)
doesn't really do what it sounds like?
k
I'm not sure, I haven't looked into it in a bit. I would recommend opening an issue on the kotlinx.coroutines github if you're having trouble
h
The OP already did, so we'll see if it goes somewhere. Thank you again!
For anybody still interested in this, the answer was kindly provided by @elizarov here