https://kotlinlang.org logo
#flow
Title
# flow
r

Robert Wijas

07/02/2021, 1:53 PM
Copy code
/**
 * Implements function from
 * [RxJava](<http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html#throttleLatest-long-java.util.concurrent.TimeUnit->)
 */
internal fun <T> Flow<T>.throttleLatest(timeout: Duration) =
    conflate().transform {
        emit(it);
        delay(timeout);
    }
Hey, am I missing any flaws in this implementation? Thx 👋
e

ephemient

07/02/2021, 7:44 PM

https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.png

Rx doesn't emit that final #FFFE4B item, but Flow.conflate() will
r

Robert Wijas

07/02/2021, 7:55 PM
Are you sure? I think it will eventually be emitted if nothing else is emitted. It’s just that the collection ends before the throttle window, so it doesn’t have a chance to be emitted - compare the the green one for example. Thx for your answer!
e

ephemient

07/02/2021, 7:58 PM
if the upstream observable/flow ends within the window, that's what will happen
Rx will drop that last element, Flow won't. whether that matters to you is a different question, but they're not the same
r

Robert Wijas

07/02/2021, 8:02 PM
🤔 I think I’m missing this subtle difference. I’m gonna write a test for this case, maybe this will help me understand why this yellow one would be emitted in my implementation. Thanks!
e

ephemient

07/02/2021, 8:17 PM
a concrete example:
Copy code
Observable.fromIterable(1..10).throttleLatest(1, TimeUnit.SECONDS).blockingForEach(::println) // (Rx) prints "1\n"
runBlocking { (1..10).asFlow().throttleLatest(Duration.seconds(1)).collect(::println) } // (yours) prints "1\n10\n"
🙏 1
r

Robert Wijas

07/04/2021, 1:28 PM
Thanks, I get it now. So my implementation behaves like
Observable.throttleLatest
with
emitLast
set to
true
. That’s actually what I wanted 🙂
e

ephemient

07/05/2021, 8:46 AM
still not entirely the same as `emitLast = true`: Rx will terminate as soon as the last item is emitted, while your Flow operator will continue waiting
there are working implementations of those operators here: https://github.com/Kotlin/kotlinx.coroutines/pull/2128 however those use
internal scopedFlow
so they can't be exactly re-implemented outside of the kotlinx.coroutines repo
r

Robert Wijas

07/05/2021, 11:36 AM
The remaining differences don’t matter in my use case, it’s close enough 🙂 Thanks again for your detailed explanation!