Is this a correct way to run a block of code in an...
# coroutines
n
Is this a correct way to run a block of code in another coroutine scope and continue when the given coroutine scope is cancelled?
Copy code
val connectionScope: CoroutineScope = ...

suspend fun withConnection(block: suspend () -> Unit) {
    connectionScope.launch {
        block()
    }.join()
}
I have a bad feeling that it breaks structured concurrency but I couldn't figure out a better solution yet... Thanks.
k
Can you talk more about your use case?
๐Ÿ™ 1
n
I have a class managing client-side WebSocket connections with automatic reconnection. It has a method
withConnection()
which runs the given code in the context of the current WebSocket connection (the WebSocket connection itself has a coroutine scope, it is built on Ktor). If the connections fails, the code run by
withConnection()
should be cancelled and
withConnection()
should return normally without exception. (It is more complex - e.g. callers of
withConnection()
are suspended until a connection is successfully established - but I hope the main concept is clear...) Something like:
Copy code
interface ConnectionManager {

    suspend fun withConnection(block: suspend (BidirectionalConnection) -> Unit)
}

suspend fun sendStatus() {
    val cm: ConnectionManager = ...
    while (true) {
        cm.withConnection {
            // Connected
            while (true) {
                it.send()
                delay(1000)
            }
        }
        // Disconnected
    }
}
I would like to
block
(executed by
withConnection()
) to be cancelled โ€ข if the coroutine running
sendStatus()
is cancelled AND โ€ข if the coroutine scope of the current WebSocket connection is cancelled (but in this case I want
withConnection()
to return normally, without exceptions).
2nd version? ๐Ÿ™‚
Copy code
suspend fun withConnection(block: suspend () -> Unit) {
    val job = connectionScope.launch {
        block() // Cancelled in case of connection failure
    }
    try {
        job.join()
    } catch (e: CancellationException) {
        job.cancel() // Explicit cancel if this coroutine is cancelled
        throw e
    }
}
s
You can "cross" coroutine scopes to have components with different lifecycles interact with each other. https://medium.com/swlh/how-can-we-use-coroutinescopes-in-kotlin-2210695f0e89 (see the "Switching CoroutineScopes" section meer the bottom)
โœ… 1
๐Ÿ™ 1
n
Thanks. Maybe my second solution is not bad at all ๐Ÿ™‚
b
for your second solution - in our codebase we have an extension to do a similar thing, but we use cancelAndJoin(), and in addition we do it in Non-Cancellable context:
Copy code
try {
   job.join()
} catch (e: CancellationException) {
   withContext(NonCancellable) {
      job.cancelAndJoin()
   }
   throw e
}
this is better because it mimics the behavior as if job would be in the same context. It ensures job is not only was cancelled, but also completed (as cancellation is cooperative, cancellation != completion). Better to understand with an example:
Copy code
var mutableVariable = 0
flow { 
   emit(100)
   delay(50)
   emit(400)
}.collectLatest { delay -> 
   withConnection {
      Thread.sleep(500 - delay) // some "blocking" call that is not cooperative to cancellation
      mutableVariable = delay
   }
}
println(mutableVariable)
with just
cancel()
you will see unexpected behavior (mutableVariable at the end will be 100, while expected to be 400). Overall, mutableVariable may be used concurrently and from parallel threads depending on withConnection's context. With cancelAndJoin+NonCancellable it will work as expected
โœ… 1
๐Ÿ™ 1
n
Hmmm, and you don't rethrow the
CancellationException
after
job.cancelAndJoin()
?
b
we do, I was writing the code in slack, not copying it, so forgot about that part
๐Ÿ™ 1
n
Thanks @bezrukov, this helped a lot (although I will have to think more about the cancellation behavior you described ๐Ÿ˜…). (Maybe please edit the code snippet, so if anyone else finds it in the future, it will contain the rethrow as well...)
๐Ÿ‘ 1