MiSikora
10/14/2021, 8:44 AMdispatcher gets frozen after launching the job? I noticed it does not happen when channel is a global value. I’m having issues wrapping my head around it.
class FreezeTest {
private val dispatcher = ImmediateDispatcher()
private val callbacks = AtomicReference(emptyList<() -> Unit>().freeze())
@Test
fun freezeTest() = runBlocking(dispatcher) {
println("1 isFrozen: ${dispatcher.isFrozen}") // False
val job = launch {
val channel = Channel<Unit>()
val callback: () -> Unit = { channel.trySend(Unit) }
callbacks.update { it + callback }
try {
for (signal in channel) {
channel.send(Unit)
}
} finally {
callbacks.update { it - callback }
}
}
println("2 isFrozen: ${dispatcher.isFrozen}") // True
job.cancel()
}
}
fun <T> AtomicReference<T>.update(func: (T) -> T) {
while (true) {
val old = value
val new = func(old).freeze()
if (compareAndSet(old, new)) break
}
}
class ImmediateDispatcher : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) = block.run()
}MiSikora
10/14/2021, 8:45 AMSQLDelight via Flow freezes this dispatcher and we can no longer delay or use timeouts.Trevor Stone
10/14/2021, 2:12 PMthis.callbacks and so this being FreezeTest is crossing coroutine boundaries and being frozenTrevor Stone
10/14/2021, 2:12 PMMiSikora
10/14/2021, 2:44 PMFreezeTest is not being frozen though. Also, as mentioned, moving channel out of local val to global val resolves this.
Things are being frozen because they are explicitly frozen inside AtomicReference. But this doesn’t explain why CoroutineContext is being frozen. And why it is not being frozen when channel is global.
I have a workaround that fixes this but it is ugly as hell, introduces another issue, and I still don’t understand why the whole scope/context are being frozen in this scenario.MiSikora
10/14/2021, 2:45 PMMiSikora
10/14/2021, 2:45 PMactual fun <T> Flow<T>.fix(context: CoroutineContext): Flow<T> = if (context[FixTokenKey] == null) {
this
} else channelFlow {
val fixScope = CoroutineScope(context)
val channel = Channel<T>()
onEach(channel::send).launchIn(fixScope)
try {
for (element in channel) send(element)
} finally {
fixScope.cancel()
}
}
object FixToken : AbstractCoroutineContextElement(FixTokenKey) {
override fun toString() = "FixToken"
}
private object FixTokenKey : CoroutineContext.Key<FixToken>