saket
07/04/2021, 5:20 AMArkadii Ivanov
07/04/2021, 12:22 PMfun <T> Observable<T>.replayShare(): Observable<T> {
var subj by AtomicReference<ReplaySubject<T>>(ReplaySubject(bufferSize = 1))
val obs =
object : ConnectableObservable<T> {
override fun subscribe(observer: ObservableObserver<T>) {
subj.subscribe(observer)
}
override fun connect(onConnect: ((Disposable) -> Unit)?) {
val disposable = SerialDisposable()
onConnect?.invoke(disposable)
this@replayShare
.doOnBeforeTerminate { subj = ReplaySubject(bufferSize = 1) }
.subscribe(subj.getObserver(disposable::set))
}
}
return obs.refCount()
}
I verified it with the tests from the RxReplayingShare repository.Arkadii Ivanov
07/04/2021, 12:26 PMdefaultValue
argument:
fun <T> Observable<T>.replayShare(): Observable<T> =
replayShare { ReplaySubject(bufferSize = 1) }
fun <T> Observable<T>.replayShare(defaultValue: T): Observable<T> =
replayShare {
ReplaySubject<T>(bufferSize = 1).apply {
onNext(defaultValue)
}
}
private fun <T> Observable<T>.replayShare(subjectFactory: () -> Subject<T>): Observable<T> {
val subj = AtomicReference(subjectFactory())
val obs =
object : ConnectableObservable<T> {
override fun subscribe(observer: ObservableObserver<T>) {
subj.value.subscribe(observer)
}
override fun connect(onConnect: ((Disposable) -> Unit)?) {
val disposable = SerialDisposable()
onConnect?.invoke(disposable)
this@replayShare
.doOnBeforeTerminate { subj.value = subjectFactory() }
.subscribe(subj.value.getObserver(disposable::set))
}
}
return obs.refCount()
}
saket
07/06/2021, 8:45 PMsaket
07/06/2021, 8:45 PMArkadii Ivanov
07/06/2021, 11:03 PM