saket

    saket

    1 year ago
    Is it possible to recreate ReplayingShare with reaktive’s operators? https://github.com/JakeWharton/RxReplayingShare
    Arkadii Ivanov

    Arkadii Ivanov

    1 year ago
    I found the following way:
    fun <T> Observable<T>.replayShare(): Observable<T> {
        var subj by AtomicReference<ReplaySubject<T>>(ReplaySubject(bufferSize = 1))
    
        val obs =
            object : ConnectableObservable<T> {
                override fun subscribe(observer: ObservableObserver<T>) {
                    subj.subscribe(observer)
                }
    
                override fun connect(onConnect: ((Disposable) -> Unit)?) {
                    val disposable = SerialDisposable()
                    onConnect?.invoke(disposable)
    
                    this@replayShare
                        .doOnBeforeTerminate { subj = ReplaySubject(bufferSize = 1) }
                        .subscribe(subj.getObserver(disposable::set))
                }
            }
    
        return obs.refCount()
    }
    I verified it with the tests from the RxReplayingShare repository.
    Or if you also need the
    defaultValue
    argument:
    fun <T> Observable<T>.replayShare(): Observable<T> =
        replayShare { ReplaySubject(bufferSize = 1) }
    
    fun <T> Observable<T>.replayShare(defaultValue: T): Observable<T> =
        replayShare {
            ReplaySubject<T>(bufferSize = 1).apply {
                onNext(defaultValue)
            }
        }
    
    private fun <T> Observable<T>.replayShare(subjectFactory: () -> Subject<T>): Observable<T> {
        val subj = AtomicReference(subjectFactory())
    
        val obs =
            object : ConnectableObservable<T> {
                override fun subscribe(observer: ObservableObserver<T>) {
                    subj.value.subscribe(observer)
                }
    
                override fun connect(onConnect: ((Disposable) -> Unit)?) {
                    val disposable = SerialDisposable()
                    onConnect?.invoke(disposable)
                    this@replayShare
                        .doOnBeforeTerminate { subj.value = subjectFactory() }
                        .subscribe(subj.value.getObserver(disposable::set))
                }
            }
    
        return obs.refCount()
    }
    saket

    saket

    1 year ago
    Awesome, thank you so much!
    Is there any value in offering this as a first party operator?
    Arkadii Ivanov

    Arkadii Ivanov

    1 year ago
    I will keep this in mind. If there will be more requests, we will consider it. Thanks!