I have a kotlin flow ```flow { /* do some IO and ...
# coroutines
t
I have a kotlin flow
Copy code
flow { /* do some IO and emits some results */ }
  .collect { /* do some post processing */ }
If I introduce some retry logic using delay() into the body of the flow, it deadlocks if delay() gets called. In particular, it deadlocks after delay gets called, I think the first time I do some IO after delay returns.
Copy code
flow { /* do some IO, retry if necessary, and emit some results */ }
  .collect { /* do some post processing */ }
But if I tell the flow to flow on some dispatcher, then it suddenly works again
Copy code
flow { /* do some IO, retry if necessary, and emit some results */ }
  .flowOn(Dispatchers.Unconfined)
  .collect { /* do some post processing */ }
Why would that be the case?
1
y
Are you running in a test?
runTest
does weird things to
delay
t
I'm running in a
suspend fun main()
y
Try surrounding it all in
withContext(Dispatchers.Blah)
.
suspend fun main
doesn't set up a dispatcher
t
Yes... that works as well. Can you tell me more about "main doesn't set up a dispatcher?"
y
So
suspend fun main
sets up nothing
kotlinx-coroutines
related. It's implemented to just run the coroutine and throw an exception if necessary (plus some Thread sleeping optimizations on the JVM). I think there is a
DefaultDelay
that does end up getting called, but maybe its behaviour somehow causes issues in your case, not sure.
t
So is it good practice to write
Copy code
suspend fun main() = withContext(Dispatchers.Default) {
  ...
}
✔️ 1
z
the dispatchers machinery has special handling for dispatchers that don't actually dispatch (like
Unconfined
) to prevent infinite recursion and this sort of thing. So probably even doing
withContext(Dispatchers.Unconfined)
will fix it.
t
I confirm that
Dispatchers.Unconfined
also works
I'm scared of using
Unconfined
because it's documentation says I shouldn't use it
y
I think going with the Default dispatcher then is the move, or even
runBlocking
.
t
Claude told me > Kotlin Flow's retry logic with
delay()
can cause thread starvation deadlock when
flowOn()
is absent, stemming from Flow's context preservation principle colliding with limited dispatcher thread pools. The deadlock occurs because all operations compete for the same threads, Stack Overflow while
flowOn(Dispatchers.Unconfined)
solves this by bypassing traditional dispatcher queuing entirely. Does that sound right, or is it hallucinating?
Is there any documentation/guidance on
suspend fun main
that I may have missed?
n
All the code within
suspend fun main() { ... }
runs on the single-threaded main dispatcher (
Dispatchers.Main
). Usually it is used in UI applications (android, desktop) for UI modifications to avoid data races when editing the UI. But it's just a regular thread, no magic attached to it. However,
delay()
plays well with a single thread, since it's a suspending function, not blocking. It suspends and releases the underlying thread for others to enjoy. There's no inherent reason for your sample not to work on a single thread.
Here's a simplification of suspending IO with retries.
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun simple() {
    flow {
        delay(10)
        emit(42)
    }.collect {
        println("hello")
    }
}


/* IO with retry example */

class SuspendingIO() {
    var counter = 0
    suspend fun poll(): Int? {
        delay(100) // simulate a suspending IO call
        counter++
        return if (counter == 5) 42 else null
    }
}

suspend fun complex() {
    val io = SuspendingIO()
    flow {
        var res: Int? = null
        while (res == null) {
            res = io.poll()
            delay(10)
        }
        emit(res)
    }.collect {
        println(it)
    }
}

suspend fun main() {
    // Complex is the core idea of how suspending IO with suspending retries work.  
    // Note, simple is the same as complex. It's not that complex after all.
    simple()
    complex()
}
https://pl.kotl.in/xpwi9GRPg
Something else is causing the deadlock. If it happens before the first emit takes place, the code inside collect hasn't had a chance to run yet. When your
flow {}.collect {}
construct delays, other coroutines get to execute. Which synchronisation resource is used in flow which is also used outside of flow/collect somewhere, in some other coroutine? Looks like someone of them is taking this resource from flow when it does its second IO call, and possibly blocks waiting for the flow.collect to finish. What do you use for IO? Could you provide a simplified reproducer? While you might want to use other dispatchers for performance (for correctness, it doesn't matter) (
flow { /* io calls */ }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>).collect()
), using a single thread helped you to discover this bug!
t
Thank you for your thoughtful answer! To generate the flow, I am reading from a big query table, a few rows at a time. It occurs to me that that is a blocking operation since big query doesn't offer an asynchronous client yet. To collect the flow, I'm emitting to some infinite channel, so that's not blocking at all. It must be the blocking during BQ read that's causing the deadlock, but I can't imagine why the delay will cause that to happen. Delay suspends and releases the thread, allowing the blocking call to happen. The blocking call happens and doesn't release the thread, but eventually returns. So what's the problem?
n
NB: emitting to a kotlin channel is never blocking, it could only be suspending when there's no buffer space left or, for rendezvous channels (0 buffer), there's no receiver to collect the element. But it's not really important what's happening in collect here, since collect doesn't execute until the element is emitted.
What other coroutines running on
Dispatchers.Main
are doing?
The blocking call happens and doesn't release the thread, but eventually returns.
What do you mean by "eventually returns"? If it returns, then it's not a deadlock?
d
All the code within
suspend fun main() { ... }
runs on the single-threaded main dispatcher (
Dispatchers.Main
).
Not true.
Dispatchers.Main
is the UI thread, not the thread running the
main
function.
Main
can even be unavailable. For example, this code crashes if you run it in https://play.kotlinlang.org:
Copy code
import kotlinx.coroutines.*
suspend fun main() {
    GlobalScope.launch(Dispatchers.Main) {
    }
}
suspend fun main
is weird in that it indeed doesn't have a
CoroutineDispatcher
, so its dispatching behavior is strange. Example:
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun main() {
    println(Thread.currentThread())
    flow {
        println(Thread.currentThread())
        delay(100)
        println(Thread.currentThread())
        emit(3)
        println(Thread.currentThread())
    }.collect {
        println(Thread.currentThread())
        println(it)
    }
}
prints
Copy code
Thread[main,5,main]
Thread[main,5,main]
Thread[kotlinx.coroutines.DefaultExecutor,5,main]
Thread[kotlinx.coroutines.DefaultExecutor,5,main]
3
Thread[kotlinx.coroutines.DefaultExecutor,5,main]
DefaultExecutor
is the thread that performs delays; after it's done, the execution should return to some reasonable thread, but since
suspend fun main
does not define a reasonable thread to run on, the execution just continues on the thread where
delay
happened. I can easily believe the code deadlocked, though I can't quickly think of a way to reproduce this. Bottom line: yes, specifying some dispatcher for
suspend fun main()
is how you fix this.
thank you color 2