Toby
11/14/2023, 11:37 PMfor (msg in channel) { ... }
This initial coroutine was created via CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch { .. }
Before entering that loop, I launch another coroutine, which is regularly send()ing messages into the channel.
eg. while (true) { delay(..); channel.send(..) }
Initially I was creating the second coroutine with coroutineScope { launch { ... } }
, but later switched to CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch { .. }
.
I switched because in the first version, it felt like this coroutine was able to block the other one!
If the _channel capacity was exceeded, this second coroutine starts to block on the channel.send() call, which is expected. However, it also seemed to be blocking my first coroutine too, which I did not expect.
Can you help me understand what was happening here?
And, also, I'm concerned about whether my second coroutine will still be properly considered a child of the first co-routine, with regard to it being properly cancelled if the parent is cancelled.Joffrey
11/15/2023, 12:11 AMcoroutineScope { launch { ... } }
is the same as ...
Jacob
11/15/2023, 12:11 AMToby
11/15/2023, 12:14 AMJob
that can run in parallel if you launch
inside your own coro scope?ephemient
11/15/2023, 12:25 AMcoroutineScope { ... }
(or any other normal suspend fun
) returns, everything it has launched has also completed. this is the structure in structured concurrencyephemient
11/15/2023, 12:26 AMlaunch
isn't such a function; it's an extension on CoroutineScope
, indicating that it launches a coroutine in there, not bounded by the function's own lifetimeephemient
11/15/2023, 12:27 AMCoroutineScope(...).launch { }
. that results in breaking the parent-child job relationsToby
11/15/2023, 12:30 AMephemient
11/15/2023, 12:34 AMcoroutineScope {
launch { one() }
coroutineScope {
launch { two() }
launch { three() }
}
launch { four() }
}
one+two+three may run concurrently. one+four may run concurrently. but two+three will never run concurrently with four; they must be complete before control flows to where four is launched.
(regardless of whether you split out some parts of this into other functions or not)Toby
11/15/2023, 12:37 AMToby
11/15/2023, 12:41 AMimport kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
suspend fun outer(channel: Channel<String>) {
// coroutineScope { launch {
CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
inner(channel)
}
for (message in channel) {
println("Hello ${message}")
}
}
suspend fun inner(channel: Channel<String>) {
for (i in 1..10) {
delay(1000)
channel.send("inner coro")
}
channel.close()
}
runBlocking {
val chan = Channel<String>(2)
CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
outer(chan)
}
delay(11000)
}
Toby
11/15/2023, 12:42 AMCoroutineScope.launch
method of creating another coroutine, then do you mind helping me understand the correct method to use?Toby
11/15/2023, 12:44 AMJacob
11/15/2023, 12:47 AMsuspend fun inner(channel: Channel<String>) {
for (i in 1..10) {
delay(1000)
channel.send("inner coro")
}
channel.close()
}
runBlocking {
val channel = Channel<String>(2)
launch {
inner(channel)
}
for (s in channel) {
println("Hello $s")
}
}
doesn’t work?ephemient
11/15/2023, 12:48 AMCoroutineScope()
is
suspend fun outer(channel: Channel<String>): Unit = coroutineScope {
launch(<http://Dispatchers.IO|Dispatchers.IO>) {
inner(channel)
}
for (message in channel) {
println("Hello ${message}")
}
}
runBlocking {
val chan = Channel<String>(2)
withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
withTimeout(11000) {
outer(chan)
}
}
}
and everything should run concurrently as expectedToby
11/15/2023, 12:49 AMouter()
function call.Toby
11/15/2023, 12:49 AMephemient
11/15/2023, 12:50 AMouter
leaks past its lifetime to its caller - such as whether it `launch`es child coroutinesToby
11/15/2023, 12:50 AMToby
11/15/2023, 12:57 AMcoroutineScope { launch { ... } }
and that changes to this in your code:
coroutineScope { launch(<http://Dispatchers.IO|Dispatchers.IO>) { ... } }
So, by adding the dispatcher context to launch(), that is actually letting us have another parallel worker, unlike the plain launch {
version?Toby
11/15/2023, 12:57 AMephemient
11/15/2023, 12:58 AMephemient
11/15/2023, 12:59 AMlaunch(context) { ... }
is equivalent to launch { withContext(context) { ... } }
ephemient
11/15/2023, 1:00 AMf()
which blocks the thread, and you are currently executing in a single-threaded dispatcher (such as runBlocking
), then both f()
and launch { f() }
will prevent that single thread from making progressephemient
11/15/2023, 1:00 AMwithContext(<http://Dispatchers.IO|Dispatchers.IO>) { f() }
will use a built-in threadpoolToby
11/15/2023, 1:01 AMdelay()
and channel.send()
are both suspend functions, yeah?ephemient
11/15/2023, 1:02 AMephemient
11/15/2023, 1:02 AMToby
11/15/2023, 1:04 AMephemient
11/15/2023, 1:06 AMCoroutineScope(...).launch {}
)?ephemient
11/15/2023, 1:06 AMToby
11/15/2023, 1:08 AMephemient
11/15/2023, 1:08 AMToby
11/15/2023, 1:10 AMToby
11/15/2023, 1:11 AMToby
11/15/2023, 1:11 AMephemient
11/15/2023, 1:13 AMToby
11/15/2023, 1:15 AMToby
11/15/2023, 1:15 AMephemient
11/15/2023, 1:18 AMephemient
11/15/2023, 1:19 AMephemient
11/15/2023, 1:19 AMToby
11/15/2023, 1:26 AMToby
11/15/2023, 1:27 AMToby
11/15/2023, 1:27 AMephemient
11/15/2023, 1:29 AMToby
11/15/2023, 1:30 AMToby
11/15/2023, 1:30 AMephemient
11/15/2023, 1:33 AMToby
11/15/2023, 1:33 AMToby
11/15/2023, 1:34 AMToby
11/15/2023, 2:14 AMcoroutineScope { launch { ... } }
methods now, but my large system still hangs unless I use CoroutineScope(Dispatchers.Default).launch { .. }
Toby
11/15/2023, 2:15 AMephemient
11/15/2023, 2:16 AMDispatchers.Default
and <http://Dispatchers.IO|Dispatchers.IO>
actually share threads, so that "switching" from one dispatcher to the other can be optimizedToby
11/15/2023, 2:16 AMlaunch {}
wants to wait for the routine inside it to complete, yet in my system, that's an infinite loop (until cancelled)Jacob
11/15/2023, 2:16 AMToby
11/15/2023, 2:16 AMToby
11/15/2023, 2:20 AMdelay; channel.send()
like my example.
But I never see the code get past the launch {} that kicks it off.
Which makes sense from the structured concurrency thing mentioned earlier I guess. But I guess I don't know what the right way is to have a properly asynchronous child coroutine?Jacob
11/15/2023, 2:26 AMToby
11/15/2023, 2:43 AMephemient
11/15/2023, 2:45 AMwithContext(EmptyCoroutineContext)
), and is the way you are supposed to use to make use of CoroutineScope.*
extensions inside a suspend fun
Toby
11/15/2023, 2:46 AMlaunch { inner() }
is called in outer().
Whereas in my real code, it was the equivalent of:
fun outer {
launchInnerThing()
doStuff...
}
fun launchInnerThing() {
coroutineScope { launch {
while (true) { // do stuff }
} }
}
ephemient
11/15/2023, 2:47 AMcoroutineScope { launch { ... } }
does not return until the body of the launch is completeToby
11/15/2023, 2:49 AMcoroutineScope
occurring twice in your own example.. I hadn't quite made the connectionJacob
11/15/2023, 2:50 AMfun CoroutineScope.launchInnerThing()
and lose the inner coroutineScope builder (and DON’T mark it as suspend!)ephemient
11/15/2023, 2:51 AMephemient
11/15/2023, 2:52 AMsuspend fun runInner()
that callers will launch {}
if they want to perform other actions concurrently withephemient
11/15/2023, 2:54 AMsuspend fun
XOR CoroutineScope
receiver, never both. https://elizarov.medium.com/structured-concurrency-722d765aa952Jacob
11/15/2023, 2:54 AMToby
11/15/2023, 2:57 AMToby
11/15/2023, 2:57 AMDaniel Pitts
11/15/2023, 3:17 AMlouiscad
11/15/2023, 3:44 AMCoroutineScope(…).launch { }
can lead to silent malfunction if any underlying callback happens to be registered in a WeakReference (something you might not know): https://github.com/Kotlin/kotlinx.coroutines/issues/1061Toby
11/15/2023, 4:04 AMJoffrey
11/15/2023, 8:55 AMcoroutineScope {
launch {
doStuff1()
}
doStuff2()
}
In this case doStuff1
and doStuff2
run concurrently, because coroutineScope
only waits for the launch at the end of the block. That's how you construct hierarchies of concurrent coroutines. Ephemient had mentioned it but I'm not sure it was clear for you.Toby
11/15/2023, 9:51 AMJoffrey
11/15/2023, 10:12 AMcoroutineScope
into a suspend function (I mean it doesn't change the behaviour). It is itself a suspend function. What matters is the placement of code inside or outside the launch
or the coroutineScope
blockToby
11/15/2023, 10:52 AMcoroutineScope { ... }
is essentially a blocker, that won't return until everything inside it has completed -- including if what it contains are launch {...}
statements.
If you want two coroutineScopes to be running in parallel, they need to be inside launch
statements at a higher scope.
ie
runBlocking {
launch {
coroutineScope {
launch { verySlowThing() }
doAnotherRelatedThing()
}
}
launch {
coroutineScope {
launch { oneThing }
launch { otherRelatedThing() }
}
}
}
And in fact, I don't need the coroutineScope
inside launch
if I'm only doing one thing in there.Toby
11/15/2023, 10:55 AMlaunch {
inside a regular suspend function -- I need to have the CoroutineScope available.
This kinda forces me to use suspend fun FooBar() = coroutineScope {
though, which then has the effect of requiring everything inside the function to complete before the whole function returns -- including things I launched.
I'm beginning to understand how this is intentional now, but also leads me to wonder:
Is there a way to get access to launch
without forcing me to create a new coroutineScope?Toby
11/15/2023, 10:55 AMlouiscad
11/15/2023, 10:57 AMCoroutineScope(…)
, or the result of the only launch
result (its Job
), or you can risk the GC coming for it. And mind how Exception handling and cancellation propagation isn't going to work the same.louiscad
11/15/2023, 11:00 AMcoroutineScope { … }
, so you need to think more about it to avoid having uncaught exceptions that crash your program, or coroutines that keep running when they should have been cancelled.Toby
11/15/2023, 11:03 AMJacob
11/15/2023, 1:11 PMbut also leads me to wonder:
Is there a way to get access to launch without forcing me to create a new coroutineScope?
Yes, as I mentioned earlier, you can make your function an extension function on CoroutineScope and not suspend. Also see https://elizarov.medium.com/coroutine-context-and-scope-c8b255d59055
If you need to launch a coroutine that keeps running after your function returns, then make your function an extension of CoroutineScope or pass scope: CoroutineScope as parameter to make your intent clear in your function signature. Do not make these functions suspending:
Jacob
11/15/2023, 2:16 PM