Arkadii Ivanov
02/09/2020, 4:23 PMfork for (at least) RxJava is broken. The following test case demonstrates the issue:
"Fork Fiber is cancelled when chain is disposed before emission of Fiber" {
val dispatchers = MaybeK.dispatchers()
val c =
object : MaybeKConcurrent {
override fun dispatchers(): Dispatchers<ForMaybeK> = dispatchers
}
val testScheduler = TestScheduler()
with(c) {
var emitter: MaybeEmitter<*>? = null
val innerCtx = testScheduler.asCoroutineContext()
val d =
Maybe.create<Int> { emitter = it }
.k()
.fork(innerCtx)
.continueOn(innerCtx)
.flatMap(Fiber<ForMaybeK, Int>::cancel)
.value()
.subscribe()
d.dispose() // Dispose the chain before the Fiber is cancelled or the Maybe is executed, commenting out this line makes test passing
testScheduler.triggerActions() // Now process the Fiber emission and Maybe execution
assertNotNull(emitter)
assertEquals(true, emitter?.isDisposed)
}
}
Failure:
arrow.fx.MaybeKTests > Fork Fiber is cancelled when chain is disposed before emission of Fiber FAILED
java.lang.AssertionError: expected:<true> but was:<false>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:118)
at kotlin.test.junit.JUnitAsserter.assertEquals(JUnitSupport.kt:32)
at kotlin.test.AssertionsKt__AssertionsKt.assertEquals(Assertions.kt:51)
at kotlin.test.AssertionsKt.assertEquals(Unknown Source)
at kotlin.test.AssertionsKt__AssertionsKt.assertEquals$default(Assertions.kt:50)
at kotlin.test.AssertionsKt.assertEquals$default(Unknown Source)
at arrow.fx.MaybeKTests$1.invokeSuspend(MaybeKTests.kt:86)
at arrow.fx.MaybeKTests$1.invoke(MaybeKTests.kt)
Explanation:
override fun <A> Kind<ForMaybeK, A>.fork(coroutineContext: CoroutineContext): MaybeK<Fiber<ForMaybeK, A>> =
coroutineContext.asScheduler().let { scheduler ->
Maybe.create<Fiber<ForMaybeK, A>> { emitter ->
if (!emitter.isDisposed) {
val s: ReplaySubject<A> = ReplaySubject.create()
// Here we susbcribe to the upstream, it is active from now on
val conn: RxDisposable = value().subscribeOn(scheduler).subscribe(s::onNext, s::onError)
/*
* Here we emit the Fiber to the downstream,
* this is the only guy who can cancel the upstream.
* But if there is an observeOn operator somewhere down the stream,
* then there is a race condition.
* If the downstream will be disposed before observeOn processes
* the emission of the Fiber, then it will never reach its listener
* and will never be cancelled.
*/
emitter.onSuccess(Fiber(s.firstElement().k(), MaybeK {
conn.dispose()
}))
}
}.k()
}
Should I submit the issue or is it expected? Or maybe I'm missing something?