So I did pretty much everything, stuck with `raceP...
# arrow-contributors
a
So I did pretty much everything, stuck with
racePair
implementation for
Maybe
. I implemented
CoroutineContext
->`Scheduler` interop properly and now race conditions affect tests more often. Already spend ~8h of debugging, ended up with a different implementation that works. So what is going on (will explain in terms of the existing
RxJava2
implementation and the
racePairCanJoinLeft
test):
Copy code
override fun <A, B> CoroutineContext.racePair(fa: MaybeKOf<A>, fb: MaybeKOf<B>): MaybeK<RacePair<ForMaybeK, A, B>> =
    asScheduler().let { scheduler ->
      Maybe.create<RacePair<ForMaybeK, A, B>> { emitter ->
        val sa = ReplaySubject.create<A>()
        val sb = ReplaySubject.create<B>()
        val dda = fa.value()
          /*
          * 1:
          * We subscribe to "fa".
          * This has "subscribeOn" operator somewhere in the upstream.
          * So the subscription is asynchronous.
           */
          .subscribe(sa::onNext, sa::onError)
        val ddb = fb.value().subscribe(sb::onNext, sb::onError)
        emitter.setCancellable { 
          /*
           * 3:
           * This is called after emission in Step 2 is done.
           * Here is the race condition.
           * If this is called BEFORE Step 1 is actually subscribed
           * and value is delivered to "sa", then test fails with timeout.
           */
          dda.dispose(); ddb.dispose() 
        }
        val ffa = Fiber(sa.firstElement().k(), MaybeK { dda.dispose() })
        val ffb = Fiber(sb.firstElement().k(), MaybeK { ddb.dispose() })
        sa.subscribe({
          emitter.onSuccess(RacePair.First(it, ffb))
        }, { e -> emitter.tryOnError(e) }, emitter::onComplete)
        sb.subscribe({
          /*
           * 2:
           * We receive a value here and emit it to downstream.
           * This eventually disposes this Maybe after emission is done. 
           */
          emitter.onSuccess(RacePair.Second(ffa, it))
        }, { e -> emitter.tryOnError(e) }, emitter::onComplete)
      }.subscribeOn(scheduler).observeOn(Schedulers.trampoline()).k()
    }
So I ended up with the following implementation:
Copy code
override fun <A, B> CoroutineContext.racePair(fa: Kind<ForMaybeK, A>, fb: Kind<ForMaybeK, B>): Kind<ForMaybeK, RacePair<ForMaybeK, A, B>> {
    val scheduler = asScheduler()
    val coa: ConnectableObservable<A> = fa.value().subscribeOn(scheduler).asObservable().replay(bufferSize = 1)
    val cob: ConnectableObservable<B> = fb.value().subscribeOn(scheduler).asObservable().replay(bufferSize = 1)
    lateinit var da: RxDisposable
    coa.connect { da = it }
    lateinit var db: RxDisposable
    cob.connect { db = it }
    val ma = coa.firstOrComplete()
    val mb = cob.firstOrComplete()

    return merge(
      ma.map { RacePairResult.First(it) },
      mb.map { RacePairResult.Second(it) }
    )
      .firstOrComplete()
      .map {
        when (it) {
          is RacePairResult.First -> RacePair.First(winner = it.value, fiberB = Fiber(join = mb.k(), cancel = MaybeK(db::dispose)))
          is RacePairResult.Second -> RacePair.Second(fiberA = Fiber(join = ma.k(), cancel = MaybeK(da::dispose)), winner = it.value)
        }
      }
      .k()
  }

  private sealed class RacePairResult<out A, out B> {
    class First<A>(val value: A) : RacePairResult<A, Nothing>()
    class Second<B>(val value: B) : RacePairResult<Nothing, B>()
  }
s
I implemented
CoroutineContext
->`Scheduler` interop properly
😍
Is there a repo or branch where I can see some of these changes already?
a
I will draft PR soon, currently updating Reaktive with the required stuff
s
Awesome! Looking very much forward to seeing this contribution 👏 🎉
❤️ 1