Mark
07/17/2024, 6:52 AM/**
* Run a block of suspending code with the following conditions:
*
* 1. If the block is currently running, wait for it to complete and return its result.
* 2. If the block is not currently running, launch it and return its result.
*/
class GuardedRunner<T>(
private val block: suspend () -> T,
) {
private val mutex = Mutex()
private var currentTask: Deferred<T>? = null
suspend fun run(): T {
return mutex.withLock {
// Check if there is an ongoing task
if (currentTask == null) {
// Start a new task
currentTask = CoroutineScope(Dispatchers.Default).async {
try {
block()
} finally {
// Ensure the task is cleared once it completes
// NOTE: if this is wrapped in a withLock then we get a deadlock
mutex.withLock { currentTask = null }
}
}
}
currentTask!!.await()
}
}
}
I thought maybe the reason for the deadlock is because withLock
needs to be called in the same thread in both cases, but if I use Dispatchers.Main
for both calls it doesn’t help. EDIT: I see now that Mutex
is not reentrant so that explains why this hypothesis doesn’t help.Mark
07/17/2024, 7:13 AMclass GuardedRunner<T>(
private val block: suspend () -> T,
) {
private val mutex = Mutex()
private var currentTask: Deferred<T>? = null
suspend fun run(): T {
val task = mutex.withLock {
ensureTask {
mutex.withLock {
// Ensure the task is cleared once it completes
currentTask = null
}
}
}
return task.await()
}
private fun ensureTask(
onCompletion: suspend () -> Unit,
): Deferred<T> {
return currentTask ?: CoroutineScope(Dispatchers.Default).async {
try {
block()
} finally {
onCompletion()
}
}.also {
currentTask = it
}
}
}
Daniel Pitts
07/17/2024, 7:48 PMDaniel Pitts
07/17/2024, 7:49 PMMark
07/18/2024, 6:31 AMawait()
is not within the lock and so it is still handling synchronization.