Robert Wijas
07/02/2021, 1:53 PM/**
* Implements function from
* [RxJava](<http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html#throttleLatest-long-java.util.concurrent.TimeUnit->)
*/
internal fun <T> Flow<T>.throttleLatest(timeout: Duration) =
conflate().transform {
emit(it);
delay(timeout);
}
Hey, am I missing any flaws in this implementation? Thx 👋ephemient
07/02/2021, 7:44 PMhttps://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.png▾
Robert Wijas
07/02/2021, 7:55 PMephemient
07/02/2021, 7:58 PMRobert Wijas
07/02/2021, 8:02 PMephemient
07/02/2021, 8:17 PMObservable.fromIterable(1..10).throttleLatest(1, TimeUnit.SECONDS).blockingForEach(::println) // (Rx) prints "1\n"
runBlocking { (1..10).asFlow().throttleLatest(Duration.seconds(1)).collect(::println) } // (yours) prints "1\n10\n"
Robert Wijas
07/04/2021, 1:28 PMObservable.throttleLatest
with emitLast
set to true
. That’s actually what I wanted 🙂ephemient
07/05/2021, 8:46 AMinternal scopedFlow
so they can't be exactly re-implemented outside of the kotlinx.coroutines repoRobert Wijas
07/05/2021, 11:36 AM