Arkadii Ivanov
02/07/2020, 10:40 AMracePair
implementation for Maybe
. I implemented CoroutineContext
->`Scheduler` interop properly and now race conditions affect tests more often. Already spend ~8h of debugging, ended up with a different implementation that works.
So what is going on (will explain in terms of the existing RxJava2
implementation and the racePairCanJoinLeft
test):
override fun <A, B> CoroutineContext.racePair(fa: MaybeKOf<A>, fb: MaybeKOf<B>): MaybeK<RacePair<ForMaybeK, A, B>> =
asScheduler().let { scheduler ->
Maybe.create<RacePair<ForMaybeK, A, B>> { emitter ->
val sa = ReplaySubject.create<A>()
val sb = ReplaySubject.create<B>()
val dda = fa.value()
/*
* 1:
* We subscribe to "fa".
* This has "subscribeOn" operator somewhere in the upstream.
* So the subscription is asynchronous.
*/
.subscribe(sa::onNext, sa::onError)
val ddb = fb.value().subscribe(sb::onNext, sb::onError)
emitter.setCancellable {
/*
* 3:
* This is called after emission in Step 2 is done.
* Here is the race condition.
* If this is called BEFORE Step 1 is actually subscribed
* and value is delivered to "sa", then test fails with timeout.
*/
dda.dispose(); ddb.dispose()
}
val ffa = Fiber(sa.firstElement().k(), MaybeK { dda.dispose() })
val ffb = Fiber(sb.firstElement().k(), MaybeK { ddb.dispose() })
sa.subscribe({
emitter.onSuccess(RacePair.First(it, ffb))
}, { e -> emitter.tryOnError(e) }, emitter::onComplete)
sb.subscribe({
/*
* 2:
* We receive a value here and emit it to downstream.
* This eventually disposes this Maybe after emission is done.
*/
emitter.onSuccess(RacePair.Second(ffa, it))
}, { e -> emitter.tryOnError(e) }, emitter::onComplete)
}.subscribeOn(scheduler).observeOn(Schedulers.trampoline()).k()
}
So I ended up with the following implementation:
override fun <A, B> CoroutineContext.racePair(fa: Kind<ForMaybeK, A>, fb: Kind<ForMaybeK, B>): Kind<ForMaybeK, RacePair<ForMaybeK, A, B>> {
val scheduler = asScheduler()
val coa: ConnectableObservable<A> = fa.value().subscribeOn(scheduler).asObservable().replay(bufferSize = 1)
val cob: ConnectableObservable<B> = fb.value().subscribeOn(scheduler).asObservable().replay(bufferSize = 1)
lateinit var da: RxDisposable
coa.connect { da = it }
lateinit var db: RxDisposable
cob.connect { db = it }
val ma = coa.firstOrComplete()
val mb = cob.firstOrComplete()
return merge(
ma.map { RacePairResult.First(it) },
mb.map { RacePairResult.Second(it) }
)
.firstOrComplete()
.map {
when (it) {
is RacePairResult.First -> RacePair.First(winner = it.value, fiberB = Fiber(join = mb.k(), cancel = MaybeK(db::dispose)))
is RacePairResult.Second -> RacePair.Second(fiberA = Fiber(join = ma.k(), cancel = MaybeK(da::dispose)), winner = it.value)
}
}
.k()
}
private sealed class RacePairResult<out A, out B> {
class First<A>(val value: A) : RacePairResult<A, Nothing>()
class Second<B>(val value: B) : RacePairResult<Nothing, B>()
}
simon.vergauwen
02/12/2020, 10:01 AMI implementedš->`Scheduler` interop properlyCoroutineContext
Arkadii Ivanov
02/12/2020, 10:44 AMsimon.vergauwen
02/12/2020, 10:44 AM