Tian Tian098
09/05/2025, 6:36 PMflow { /* do some IO and emits some results */ }
.collect { /* do some post processing */ }
If I introduce some retry logic using delay() into the body of the flow, it deadlocks if delay() gets called. In particular, it deadlocks after delay gets called, I think the first time I do some IO after delay returns.
flow { /* do some IO, retry if necessary, and emit some results */ }
.collect { /* do some post processing */ }
But if I tell the flow to flow on some dispatcher, then it suddenly works again
flow { /* do some IO, retry if necessary, and emit some results */ }
.flowOn(Dispatchers.Unconfined)
.collect { /* do some post processing */ }
Why would that be the case?Youssef Shoaib [MOD]
09/05/2025, 7:59 PMrunTest
does weird things to delay
Tian Tian098
09/05/2025, 8:00 PMsuspend fun main()
Youssef Shoaib [MOD]
09/05/2025, 8:01 PMwithContext(Dispatchers.Blah)
. suspend fun main
doesn't set up a dispatcherTian Tian098
09/05/2025, 8:01 PMYoussef Shoaib [MOD]
09/05/2025, 8:03 PMsuspend fun main
sets up nothing kotlinx-coroutines
related. It's implemented to just run the coroutine and throw an exception if necessary (plus some Thread sleeping optimizations on the JVM). I think there is a DefaultDelay
that does end up getting called, but maybe its behaviour somehow causes issues in your case, not sure.Tian Tian098
09/05/2025, 8:05 PMsuspend fun main() = withContext(Dispatchers.Default) {
...
}
Zach Klippenstein (he/him) [MOD]
09/05/2025, 8:07 PMUnconfined
) to prevent infinite recursion and this sort of thing. So probably even doing withContext(Dispatchers.Unconfined)
will fix it.Tian Tian098
09/05/2025, 8:09 PMDispatchers.Unconfined
also worksTian Tian098
09/05/2025, 8:12 PMUnconfined
because it's documentation says I shouldn't use itYoussef Shoaib [MOD]
09/05/2025, 8:22 PMrunBlocking
.Tian Tian098
09/05/2025, 8:31 PMdelay()
can cause thread starvation deadlock when flowOn()
is absent, stemming from Flow's context preservation principle colliding with limited dispatcher thread pools. The deadlock occurs because all operations compete for the same threads, Stack Overflow while flowOn(Dispatchers.Unconfined)
solves this by bypassing traditional dispatcher queuing entirely.
Does that sound right, or is it hallucinating?Tian Tian098
09/05/2025, 8:36 PMsuspend fun main
that I may have missed?Natasha Murashkina
09/07/2025, 11:03 AMsuspend fun main() { ... }
runs on the single-threaded main dispatcher (Dispatchers.Main
). Usually it is used in UI applications (android, desktop) for UI modifications to avoid data races when editing the UI. But it's just a regular thread, no magic attached to it.
However, delay()
plays well with a single thread, since it's a suspending function, not blocking. It suspends and releases the underlying thread for others to enjoy. There's no inherent reason for your sample not to work on a single thread.Natasha Murashkina
09/07/2025, 11:05 AMimport kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun simple() {
flow {
delay(10)
emit(42)
}.collect {
println("hello")
}
}
/* IO with retry example */
class SuspendingIO() {
var counter = 0
suspend fun poll(): Int? {
delay(100) // simulate a suspending IO call
counter++
return if (counter == 5) 42 else null
}
}
suspend fun complex() {
val io = SuspendingIO()
flow {
var res: Int? = null
while (res == null) {
res = io.poll()
delay(10)
}
emit(res)
}.collect {
println(it)
}
}
suspend fun main() {
// Complex is the core idea of how suspending IO with suspending retries work.
// Note, simple is the same as complex. It's not that complex after all.
simple()
complex()
}
https://pl.kotl.in/xpwi9GRPgNatasha Murashkina
09/07/2025, 11:06 AMflow {}.collect {}
construct delays, other coroutines get to execute. Which synchronisation resource is used in flow which is also used outside of flow/collect somewhere, in some other coroutine? Looks like someone of them is taking this resource from flow when it does its second IO call, and possibly blocks waiting for the flow.collect to finish. What do you use for IO? Could you provide a simplified reproducer?
While you might want to use other dispatchers for performance (for correctness, it doesn't matter) (flow { /* io calls */ }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>).collect()
), using a single thread helped you to discover this bug!Tian Tian098
09/07/2025, 1:26 PMNatasha Murashkina
09/07/2025, 3:23 PMNatasha Murashkina
09/07/2025, 3:25 PMDispatchers.Main
are doing?Natasha Murashkina
09/07/2025, 3:29 PMThe blocking call happens and doesn't release the thread, but eventually returns.What do you mean by "eventually returns"? If it returns, then it's not a deadlock?
Dmitry Khalanskiy [JB]
09/11/2025, 9:40 AMAll the code withinNot true.runs on the single-threaded main dispatcher (suspend fun main() { ... }
).Dispatchers.Main
Dispatchers.Main
is the UI thread, not the thread running the main
function. Main
can even be unavailable. For example, this code crashes if you run it in https://play.kotlinlang.org:
import kotlinx.coroutines.*
suspend fun main() {
GlobalScope.launch(Dispatchers.Main) {
}
}
suspend fun main
is weird in that it indeed doesn't have a CoroutineDispatcher
, so its dispatching behavior is strange. Example:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main() {
println(Thread.currentThread())
flow {
println(Thread.currentThread())
delay(100)
println(Thread.currentThread())
emit(3)
println(Thread.currentThread())
}.collect {
println(Thread.currentThread())
println(it)
}
}
prints
Thread[main,5,main]
Thread[main,5,main]
Thread[kotlinx.coroutines.DefaultExecutor,5,main]
Thread[kotlinx.coroutines.DefaultExecutor,5,main]
3
Thread[kotlinx.coroutines.DefaultExecutor,5,main]
DefaultExecutor
is the thread that performs delays; after it's done, the execution should return to some reasonable thread, but since suspend fun main
does not define a reasonable thread to run on, the execution just continues on the thread where delay
happened.
I can easily believe the code deadlocked, though I can't quickly think of a way to reproduce this.
Bottom line: yes, specifying some dispatcher for suspend fun main()
is how you fix this.