Norbi
01/17/2024, 5:04 PMval connectionScope: CoroutineScope = ...
suspend fun withConnection(block: suspend () -> Unit) {
connectionScope.launch {
block()
}.join()
}
I have a bad feeling that it breaks structured concurrency but I couldn't figure out a better solution yet...
Thanks.kevin.cianfarini
01/17/2024, 5:07 PMNorbi
01/17/2024, 5:23 PMwithConnection()
which runs the given code in the context of the current WebSocket connection (the WebSocket connection itself has a coroutine scope, it is built on Ktor).
If the connections fails, the code run by withConnection()
should be cancelled and withConnection()
should return normally without exception.
(It is more complex - e.g. callers of withConnection()
are suspended until a connection is successfully established - but I hope the main concept is clear...)
Something like:
interface ConnectionManager {
suspend fun withConnection(block: suspend (BidirectionalConnection) -> Unit)
}
suspend fun sendStatus() {
val cm: ConnectionManager = ...
while (true) {
cm.withConnection {
// Connected
while (true) {
it.send()
delay(1000)
}
}
// Disconnected
}
}
I would like to block
(executed by withConnection()
) to be cancelled
โข if the coroutine running sendStatus()
is cancelled AND
โข if the coroutine scope of the current WebSocket connection is cancelled (but in this case I want withConnection()
to return normally, without exceptions).Norbi
01/17/2024, 5:40 PMsuspend fun withConnection(block: suspend () -> Unit) {
val job = connectionScope.launch {
block() // Cancelled in case of connection failure
}
try {
job.join()
} catch (e: CancellationException) {
job.cancel() // Explicit cancel if this coroutine is cancelled
throw e
}
}
streetsofboston
01/17/2024, 6:37 PMNorbi
01/17/2024, 7:19 PMbezrukov
01/18/2024, 9:10 AMtry {
job.join()
} catch (e: CancellationException) {
withContext(NonCancellable) {
job.cancelAndJoin()
}
throw e
}
this is better because it mimics the behavior as if job would be in the same context. It ensures job is not only was cancelled, but also completed (as cancellation is cooperative, cancellation != completion). Better to understand with an example:
var mutableVariable = 0
flow {
emit(100)
delay(50)
emit(400)
}.collectLatest { delay ->
withConnection {
Thread.sleep(500 - delay) // some "blocking" call that is not cooperative to cancellation
mutableVariable = delay
}
}
println(mutableVariable)
with just cancel()
you will see unexpected behavior (mutableVariable at the end will be 100, while expected to be 400). Overall, mutableVariable may be used concurrently and from parallel threads depending on withConnection's context.
With cancelAndJoin+NonCancellable it will work as expectedNorbi
01/18/2024, 9:22 AMCancellationException
after job.cancelAndJoin()
?bezrukov
01/18/2024, 9:24 AMNorbi
01/18/2024, 10:03 AM