https://kotlinlang.org logo
#arrow-contributors
Title
# arrow-contributors
a

Arkadii Ivanov

02/07/2020, 4:40 PM
Ok here is another problem: In
maybek.kt
we have the following function:
Copy code
override fun <A, B> CoroutineContext.racePair(fa: MaybeKOf<A>, fb: MaybeKOf<B>): MaybeK<RacePair<ForMaybeK, A, B>> =
    asScheduler().let { scheduler ->
      Maybe.create<RacePair<ForMaybeK, A, B>> { emitter ->
        val sa = ReplaySubject.create<A>()
        val sb = ReplaySubject.create<B>()
        val dda = fa.value().subscribe(sa::onNext, sa::onError)
        val ddb = fb.value().subscribe(sb::onNext, sb::onError)
        emitter.setCancellable { dda.dispose(); ddb.dispose() } // <-- This is the line in question
        val ffa = Fiber(sa.firstElement().k(), MaybeK { dda.dispose() })
        val ffb = Fiber(sb.firstElement().k(), MaybeK { ddb.dispose() })
        sa.subscribe({
          emitter.onSuccess(RacePair.First(it, ffb))
        }, { e -> emitter.tryOnError(e) }, emitter::onComplete)
        sb.subscribe({
          emitter.onSuccess(RacePair.Second(ffa, it))
        }, { e -> emitter.tryOnError(e) }, emitter::onComplete)
      }.subscribeOn(scheduler).observeOn(Schedulers.trampoline()).k()
    }
I pointed the line that I would like to discuss. I don't think this is correct to dispose BOTH subscriptions when the main stream is succeeded. If a
Winner
produced a value then the main
Maybe
completes and this callback gets executed. So the
Loser
terminates as well. In my
Reaktive
implementation I fixed this but now
racePairCanCancelsLoser
fails since:
Copy code
val loser = s.release().bracket(use = { never<String>() }, release = { p.complete(i) })
The
Loser
never completes because no one cancels it and it is
never
completes by itself. Such a change fixes the test and looks. What do you think?
Correction: no changes to this test is required, but the rx2 implementation of
racePair
still looks incorrect.
3 Views