Also I believe current implementation of `fork` f...
# arrow-contributors
a
Also I believe current implementation of
fork
for (at least) RxJava is broken. The following test case demonstrates the issue:
Copy code
"Fork Fiber is cancelled when chain is disposed before emission of Fiber" {
      val dispatchers = MaybeK.dispatchers()

      val c =
        object : MaybeKConcurrent {
          override fun dispatchers(): Dispatchers<ForMaybeK> = dispatchers
        }

      val testScheduler = TestScheduler()

      with(c) {
        var emitter: MaybeEmitter<*>? = null
        val innerCtx = testScheduler.asCoroutineContext()

        val d =
          Maybe.create<Int> { emitter = it }
            .k()
            .fork(innerCtx)
            .continueOn(innerCtx)
            .flatMap(Fiber<ForMaybeK, Int>::cancel)
            .value()
            .subscribe()

        d.dispose() // Dispose the chain before the Fiber is cancelled or the Maybe is executed, commenting out this line makes test passing
        testScheduler.triggerActions() // Now process the Fiber emission and Maybe execution

        assertNotNull(emitter)
        assertEquals(true, emitter?.isDisposed)
      }
    }
Failure:
Copy code
arrow.fx.MaybeKTests > Fork Fiber is cancelled when chain is disposed before emission of Fiber FAILED
    java.lang.AssertionError: expected:<true> but was:<false>
        at org.junit.Assert.fail(Assert.java:88)
        at org.junit.Assert.failNotEquals(Assert.java:834)
        at org.junit.Assert.assertEquals(Assert.java:118)
        at kotlin.test.junit.JUnitAsserter.assertEquals(JUnitSupport.kt:32)
        at kotlin.test.AssertionsKt__AssertionsKt.assertEquals(Assertions.kt:51)
        at kotlin.test.AssertionsKt.assertEquals(Unknown Source)
        at kotlin.test.AssertionsKt__AssertionsKt.assertEquals$default(Assertions.kt:50)
        at kotlin.test.AssertionsKt.assertEquals$default(Unknown Source)
        at arrow.fx.MaybeKTests$1.invokeSuspend(MaybeKTests.kt:86)
        at arrow.fx.MaybeKTests$1.invoke(MaybeKTests.kt)
Explanation:
Copy code
override fun <A> Kind<ForMaybeK, A>.fork(coroutineContext: CoroutineContext): MaybeK<Fiber<ForMaybeK, A>> =
    coroutineContext.asScheduler().let { scheduler ->
      Maybe.create<Fiber<ForMaybeK, A>> { emitter ->
        if (!emitter.isDisposed) {
          val s: ReplaySubject<A> = ReplaySubject.create()

          // Here we susbcribe to the upstream, it is active from now on
          val conn: RxDisposable = value().subscribeOn(scheduler).subscribe(s::onNext, s::onError)

          /*
           * Here we emit the Fiber to the downstream, 
           * this is the only guy who can cancel the upstream.
           * But if there is an observeOn operator somewhere down the stream,
           * then there is a race condition.
           * If the downstream will be disposed before observeOn processes
           * the emission of the Fiber, then it will never reach its listener 
           * and will never be cancelled.
           */
          emitter.onSuccess(Fiber(s.firstElement().k(), MaybeK {
            conn.dispose()
          }))
        }
      }.k()
    }
Should I submit the issue or is it expected? Or maybe I'm missing something?