Arkadii Ivanov
02/07/2020, 4:40 PMmaybek.kt
we have the following function:
override fun <A, B> CoroutineContext.racePair(fa: MaybeKOf<A>, fb: MaybeKOf<B>): MaybeK<RacePair<ForMaybeK, A, B>> =
asScheduler().let { scheduler ->
Maybe.create<RacePair<ForMaybeK, A, B>> { emitter ->
val sa = ReplaySubject.create<A>()
val sb = ReplaySubject.create<B>()
val dda = fa.value().subscribe(sa::onNext, sa::onError)
val ddb = fb.value().subscribe(sb::onNext, sb::onError)
emitter.setCancellable { dda.dispose(); ddb.dispose() } // <-- This is the line in question
val ffa = Fiber(sa.firstElement().k(), MaybeK { dda.dispose() })
val ffb = Fiber(sb.firstElement().k(), MaybeK { ddb.dispose() })
sa.subscribe({
emitter.onSuccess(RacePair.First(it, ffb))
}, { e -> emitter.tryOnError(e) }, emitter::onComplete)
sb.subscribe({
emitter.onSuccess(RacePair.Second(ffa, it))
}, { e -> emitter.tryOnError(e) }, emitter::onComplete)
}.subscribeOn(scheduler).observeOn(Schedulers.trampoline()).k()
}
I pointed the line that I would like to discuss. I don't think this is correct to dispose BOTH subscriptions when the main stream is succeeded. If a Winner
produced a value then the main Maybe
completes and this callback gets executed. So the Loser
terminates as well. In my Reaktive
implementation I fixed this but now racePairCanCancelsLoser
fails since:
val loser = s.release().bracket(use = { never<String>() }, release = { p.complete(i) })
The Loser
never completes because no one cancels it and it is never
completes by itself.
Such a change fixes the test and looks. What do you think?Arkadii Ivanov
02/09/2020, 12:16 AMracePair
still looks incorrect.