https://kotlinlang.org logo
#rx
Title
s

StavFX

10/19/2020, 9:32 PM
Related to the question above, but this is the actual implementation the problem comes from, and maybe my approach is flawed in the first place and I can tackle this in a way that will avoid the original question altogether (although I'm still curious if there's a solution to that too)
Copy code
/**
 * Similar to [Observable.replay], but instead of replaying the last N items, it will replay one
 * latest item per key as defined by [keySelector].
 * This makes the Observable hot.
 */
fun <T, K> Observable<T>.replayLatestPerKey(keySelector: (T) -> K): Observable<T> {
   return scan(emptyMap<K, Timed<T>>()) { map, item ->
      val key = keySelector(item)
      val value = Timed(item, System.nanoTime(), TimeUnit.NANOSECONDS)
      map + (key to value)
   }
      .replay(1)
      .refCount()
      .flatMapIterable { it.entries }
      .distinct { (key, timedValue) -> "$key@${timedValue.time()}" }
      .map { (_, timedValue) -> timedValue.value() }
}