StavFX
10/19/2020, 9:32 PM/**
* Similar to [Observable.replay], but instead of replaying the last N items, it will replay one
* latest item per key as defined by [keySelector].
* This makes the Observable hot.
*/
fun <T, K> Observable<T>.replayLatestPerKey(keySelector: (T) -> K): Observable<T> {
return scan(emptyMap<K, Timed<T>>()) { map, item ->
val key = keySelector(item)
val value = Timed(item, System.nanoTime(), TimeUnit.NANOSECONDS)
map + (key to value)
}
.replay(1)
.refCount()
.flatMapIterable { it.entries }
.distinct { (key, timedValue) -> "$key@${timedValue.time()}" }
.map { (_, timedValue) -> timedValue.value() }
}