What is the reason that `dispatcher` gets frozen a...
# kotlin-native
m
What is the reason that
dispatcher
gets frozen after launching the job? I noticed it does not happen when
channel
is a global value. I’m having issues wrapping my head around it.
Copy code
class FreezeTest {
  private val dispatcher = ImmediateDispatcher()
  private val callbacks = AtomicReference(emptyList<() -> Unit>().freeze())

  @Test
  fun freezeTest() = runBlocking(dispatcher) {
    println("1 isFrozen: ${dispatcher.isFrozen}") // False
    val job = launch {
      val channel = Channel<Unit>()
      val callback: () -> Unit = { channel.trySend(Unit) }
      callbacks.update { it + callback }
      try {
        for (signal in channel) {
          channel.send(Unit)
        }
      } finally {
        callbacks.update { it - callback }
      }
    }
    println("2 isFrozen: ${dispatcher.isFrozen}") // True
    job.cancel()
  }
}

fun <T> AtomicReference<T>.update(func: (T) -> T) {
  while (true) {
    val old = value
    val new = func(old).freeze()
    if (compareAndSet(old, new)) break
  }
}

class ImmediateDispatcher : CoroutineDispatcher() {
  override fun dispatch(context: CoroutineContext, block: Runnable) = block.run()
}
For more context we have a native dispatcher for controlling time in our tests which are single threaded. However observing data from
SQLDelight
via
Flow
freezes this dispatcher and we can no longer delay or use timeouts.
t
I believe it is because of the launch referencing callbacks directly. It is being references as
this.callbacks
and so
this
being FreezeTest is crossing coroutine boundaries and being frozen
you could try wrapping the launch with a callbacks.let{} and have everything inside the let block reference the scoped instance of callbacks and that might resolve the issue
m
FreezeTest
is not being frozen though. Also, as mentioned, moving
channel
out of local val to global val resolves this. Things are being frozen because they are explicitly frozen inside
AtomicReference
. But this doesn’t explain why
CoroutineContext
is being frozen. And why it is not being frozen when
channel
is global. I have a workaround that fixes this but it is ugly as hell, introduces another issue, and I still don’t understand why the whole scope/context are being frozen in this scenario.
For anyone interested below is the “fix”. We pass fix token in tests to apply the logic.
Copy code
actual fun <T> Flow<T>.fix(context: CoroutineContext): Flow<T> = if (context[FixTokenKey] == null) {
    this
} else channelFlow {
    val fixScope = CoroutineScope(context)
    val channel = Channel<T>()

    onEach(channel::send).launchIn(fixScope)

    try {
        for (element in channel) send(element)
    } finally {
        fixScope.cancel()
    }
}

object FixToken : AbstractCoroutineContextElement(FixTokenKey) {
    override fun toString() = "FixToken"
}

private object FixTokenKey : CoroutineContext.Key<FixToken>