Archie
11/14/2020, 6:45 PM2sec 2sec 2sec
------[A]------[B]------[C]------...----------------> InitailFlow
\ | |
\ drop drop
\
5sec \ 5sec 5sec
----------[1]---------[2]---------[3]-----|> AnotherFlow
result: [A1, A2, A3]
So I have InitailFlow
which emits a short amount of time (2 seconds) which is then transformed to AnotherFlow
which takes longer to finish (15 seconds in total)... I would like to drop the other incoming items emitted by the InitialFlow
while AnotherFlow
isn't finished...
I've tried:
flow{
delay(2000)
emit("A")
delay(2000)
emit("B")
delay(2000)
emit("C")
}.buffer(0, BufferOverflow.DROP_LATEST)
.onEach {
println("Event for $it")
}
.flatMapConcat {
flow {
delay(5000)
emit("${it}1")
delay(5000)
emit("${it}2")
delay(5000)
emit("${it}3")
}
}
.onEach {
println(it)
}
.launchIn(scope)
But for some reason this is the result:
Event for A
A1
A2
A3
Event for B
B1
B2
B3
It still process Event B for some reason even when I have a
.buffer(0, BufferOverflow.DROP_LATEST)
.
Why does it still process Event B?
Is there a way to do this? Thanks in advance.Halex
11/15/2020, 9:22 PMbuffer()
documentation for the onBufferOverflow
parameter it is mentioned this:
onBufferOverflow - configures an action on buffer overflow (optional, defaults to SUSPEND, supported only when capacity >= 0 or capacity == Channel.BUFFERED, implicitly creates a channel with at least one buffered element).
So if it implicitly creates a channel with at least one buffered element, that would explain why you get B as well (because it is stored in the buffer while A is processed and C is discarded)SharedFlow
that will be the upstream but it fails with
replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy DROP_LATEST
buffer(0, BufferOverflow.DROP_LATEST)
use rendezvous()
, defined as follows:
fun <T> Flow<T>.rendezvous(): Flow<T> = flow {
coroutineScope {
val channel = Channel<T>()
launch {
this@rendezvous.collect { channel.offer(it) }
channel.close()
}
channel.consumeEach { emit(it) }
}
}
See if it works for your use case @ArchieArchie
11/16/2020, 10:43 AMfun <T> Flow<T>.rendezvous(): Flow<T> = flow {
coroutineScope {
val channel = produce(capacity = Channel.RENDEZVOUS) {
collect { offer(it) }
}
channel.consumeEach { emit(it) }
}
}
I still feel like .buffer(RENDEVOUS, DROP_LATEST)
should work this way.Halex
11/16/2020, 11:00 AMproduce()
@Archie , we could actually drop the channel
variable altogether now
fun <T> Flow<T>.rendezvous(): Flow<T> = flow {
coroutineScope {
produce(capacity = Channel.RENDEZVOUS) { collect { offer(it) } }
.consumeEach { emit(it) }
}
}
I still feel likeI have the same opinion. Maybe it would be helpful for everyone if you described the issue on the issue tracker, so that we either understand the reasons behind the implementation, get a better solution or propose a helpful change in the library.should work this way..buffer(RENDEVOUS, DROP_LATEST)
Archie
11/16/2020, 11:02 AMmarstran
11/16/2020, 11:51 AMchannelFlow
. I posted it here: https://stackoverflow.com/a/64857490/4137489
fun <T> Flow<T>.dropIfBusy(): Flow<T> = channelFlow {
collect { offer(it) }
}.buffer(0)
Halex
11/16/2020, 12:21 PMbuffer()
doesn't create a new channel but modifies the one upstream instead. I'm going to keep that in mind for the future, thank you for the idea :thumbsup_all:marstran
11/16/2020, 12:32 PMHalex
11/16/2020, 3:34 PMkevin.cianfarini
11/16/2020, 7:32 PMHalex
11/16/2020, 8:18 PMbuffer(RENDEZVOUS, DROP_LATEST)
doesn't really do what it sounds like?kevin.cianfarini
11/16/2020, 8:19 PMHalex
11/16/2020, 8:24 PM