Is it possible to recreate ReplayingShare with rea...
# reaktive
s
Is it possible to recreate ReplayingShare with reaktive’s operators? https://github.com/JakeWharton/RxReplayingShare
a
I found the following way:
Copy code
fun <T> Observable<T>.replayShare(): Observable<T> {
    var subj by AtomicReference<ReplaySubject<T>>(ReplaySubject(bufferSize = 1))

    val obs =
        object : ConnectableObservable<T> {
            override fun subscribe(observer: ObservableObserver<T>) {
                subj.subscribe(observer)
            }

            override fun connect(onConnect: ((Disposable) -> Unit)?) {
                val disposable = SerialDisposable()
                onConnect?.invoke(disposable)

                this@replayShare
                    .doOnBeforeTerminate { subj = ReplaySubject(bufferSize = 1) }
                    .subscribe(subj.getObserver(disposable::set))
            }
        }

    return obs.refCount()
}
I verified it with the tests from the RxReplayingShare repository.
Or if you also need the
defaultValue
argument:
Copy code
fun <T> Observable<T>.replayShare(): Observable<T> =
    replayShare { ReplaySubject(bufferSize = 1) }

fun <T> Observable<T>.replayShare(defaultValue: T): Observable<T> =
    replayShare {
        ReplaySubject<T>(bufferSize = 1).apply {
            onNext(defaultValue)
        }
    }

private fun <T> Observable<T>.replayShare(subjectFactory: () -> Subject<T>): Observable<T> {
    val subj = AtomicReference(subjectFactory())

    val obs =
        object : ConnectableObservable<T> {
            override fun subscribe(observer: ObservableObserver<T>) {
                subj.value.subscribe(observer)
            }

            override fun connect(onConnect: ((Disposable) -> Unit)?) {
                val disposable = SerialDisposable()
                onConnect?.invoke(disposable)
                this@replayShare
                    .doOnBeforeTerminate { subj.value = subjectFactory() }
                    .subscribe(subj.value.getObserver(disposable::set))
            }
        }

    return obs.refCount()
}
s
Awesome, thank you so much!
Is there any value in offering this as a first party operator?
a
I will keep this in mind. If there will be more requests, we will consider it. Thanks!
❤️ 1