MiSikora
10/14/2021, 8:44 AMdispatcher
gets frozen after launching the job? I noticed it does not happen when channel
is a global value. I’m having issues wrapping my head around it.
class FreezeTest {
private val dispatcher = ImmediateDispatcher()
private val callbacks = AtomicReference(emptyList<() -> Unit>().freeze())
@Test
fun freezeTest() = runBlocking(dispatcher) {
println("1 isFrozen: ${dispatcher.isFrozen}") // False
val job = launch {
val channel = Channel<Unit>()
val callback: () -> Unit = { channel.trySend(Unit) }
callbacks.update { it + callback }
try {
for (signal in channel) {
channel.send(Unit)
}
} finally {
callbacks.update { it - callback }
}
}
println("2 isFrozen: ${dispatcher.isFrozen}") // True
job.cancel()
}
}
fun <T> AtomicReference<T>.update(func: (T) -> T) {
while (true) {
val old = value
val new = func(old).freeze()
if (compareAndSet(old, new)) break
}
}
class ImmediateDispatcher : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) = block.run()
}
MiSikora
10/14/2021, 8:45 AMSQLDelight
via Flow
freezes this dispatcher and we can no longer delay or use timeouts.Trevor Stone
10/14/2021, 2:12 PMthis.callbacks
and so this
being FreezeTest is crossing coroutine boundaries and being frozenTrevor Stone
10/14/2021, 2:12 PMMiSikora
10/14/2021, 2:44 PMFreezeTest
is not being frozen though. Also, as mentioned, moving channel
out of local val to global val resolves this.
Things are being frozen because they are explicitly frozen inside AtomicReference
. But this doesn’t explain why CoroutineContext
is being frozen. And why it is not being frozen when channel
is global.
I have a workaround that fixes this but it is ugly as hell, introduces another issue, and I still don’t understand why the whole scope/context are being frozen in this scenario.MiSikora
10/14/2021, 2:45 PMMiSikora
10/14/2021, 2:45 PMactual fun <T> Flow<T>.fix(context: CoroutineContext): Flow<T> = if (context[FixTokenKey] == null) {
this
} else channelFlow {
val fixScope = CoroutineScope(context)
val channel = Channel<T>()
onEach(channel::send).launchIn(fixScope)
try {
for (element in channel) send(element)
} finally {
fixScope.cancel()
}
}
object FixToken : AbstractCoroutineContextElement(FixTokenKey) {
override fun toString() = "FixToken"
}
private object FixTokenKey : CoroutineContext.Key<FixToken>