https://kotlinlang.org logo
#coroutines
Title
# coroutines
t

Toby

11/14/2023, 11:37 PM
hey folks, I have what is probably a naive question, while I'm learning about Kotlin's coroutines. I was confused by the behaviour of switching between two different ways of launching a sub-coroutine, and wanted to make sure I understood the consequences properly. My scenario is that I have a long-running coroutine listening to a channel, and acting on it. eg.
for (msg in channel) { ... }
This initial coroutine was created via
CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch { .. }
Before entering that loop, I launch another coroutine, which is regularly send()ing messages into the channel. eg.
while (true) { delay(..); channel.send(..) }
Initially I was creating the second coroutine with
coroutineScope { launch { ... } }
, but later switched to
CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch { .. }
. I switched because in the first version, it felt like this coroutine was able to block the other one! If the _channel capacity was exceeded, this second coroutine starts to block on the channel.send() call, which is expected. However, it also seemed to be blocking my first coroutine too, which I did not expect. Can you help me understand what was happening here? And, also, I'm concerned about whether my second coroutine will still be properly considered a child of the first co-routine, with regard to it being properly cancelled if the parent is cancelled.
j

Joffrey

11/15/2023, 12:11 AM
coroutineScope { launch { ... } }
is the same as
...
j

Jacob

11/15/2023, 12:11 AM
I'm finding this hard to follow without seeing the actual code. Note that with CoroutineScope().launch() you usually need to capture the return value of CoroutineScope() so that you can clean it up if needed. Unlike coroutineScope{launch{}} where structured concurrency will handle most things for you
t

Toby

11/15/2023, 12:14 AM
Jeffrey: Oh! Right.. that's interesting. So it doesn't create a new
Job
that can run in parallel if you
launch
inside your own coro scope?
e

ephemient

11/15/2023, 12:25 AM
when
coroutineScope { ... }
(or any other normal
suspend fun
) returns, everything it has launched has also completed. this is the structure in structured concurrency
launch
isn't such a function; it's an extension on
CoroutineScope
, indicating that it launches a coroutine in there, not bounded by the function's own lifetime
do not use
CoroutineScope(...).launch { }
. that results in breaking the parent-child job relations
☝️ 1
☝🏾 1
t

Toby

11/15/2023, 12:30 AM
I had a feeling that was going to be the case (breaking the parent-child relationship). But I'm not quite sure of the right way to get an asynchronous job running, then, without blocking the parent? (Which is perhaps happening due to a side effect of something else..)
e

ephemient

11/15/2023, 12:34 AM
Copy code
coroutineScope {
    launch { one() }
    coroutineScope {
        launch { two() }
        launch { three() }
    }
    launch { four() }
}
one+two+three may run concurrently. one+four may run concurrently. but two+three will never run concurrently with four; they must be complete before control flows to where four is launched. (regardless of whether you split out some parts of this into other functions or not)
t

Toby

11/15/2023, 12:37 AM
That does sound like what I'm seeing.
Jacob asked for an example, so here's some code:
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking

suspend fun outer(channel: Channel<String>) {
    //    coroutineScope { launch {
     CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
        inner(channel)
    }

    for (message in channel) {
        println("Hello ${message}")
    }
}

suspend fun inner(channel: Channel<String>) {
    for (i in 1..10) {
        delay(1000)
        channel.send("inner coro")
    }
    channel.close()
}

runBlocking {
    val chan = Channel<String>(2)
    CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
        outer(chan)
    }
    delay(11000)
}
If I shouldn't be using the
CoroutineScope.launch
method of creating another coroutine, then do you mind helping me understand the correct method to use?
(or feel free to redirect me to a good bit of docs.. but I was staring at the Kotlin coroutines docs for hours already without quite making this come together in my mind)
j

Jacob

11/15/2023, 12:47 AM
Copy code
suspend fun inner(channel: Channel<String>) {
    for (i in 1..10) {
        delay(1000)
        channel.send("inner coro")
    }
    channel.close()
}

runBlocking {
    val channel = Channel<String>(2)
    launch {
        inner(channel)
    }

    for (s in channel) {
        println("Hello $s")
    }
}
doesn’t work?
e

ephemient

11/15/2023, 12:48 AM
the immediate way to get rid of
CoroutineScope()
is
Copy code
suspend fun outer(channel: Channel<String>): Unit = coroutineScope {
    launch(<http://Dispatchers.IO|Dispatchers.IO>) {
        inner(channel)
    }

    for (message in channel) {
        println("Hello ${message}")
    }
}

runBlocking {
    val chan = Channel<String>(2)
    withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
        withTimeout(11000) {
            outer(chan)
        }
    }
}
and everything should run concurrently as expected
t

Toby

11/15/2023, 12:49 AM
oh, for the sake of the example, I'm specifically trying to launch the inner routine from the outer routine. Imagine that from a caller's perspective, it only wants to know about the
outer()
function call.
(in reply to Jacob)
e

ephemient

11/15/2023, 12:50 AM
nothing about
outer
leaks past its lifetime to its caller - such as whether it `launch`es child coroutines
t

Toby

11/15/2023, 12:50 AM
Thanks.. just digesting your changes, ephemient
OK, that mostly makes sense to me. In the (non-working) version of my example, I had:
coroutineScope { launch { ... } }
and that changes to this in your code:
coroutineScope { launch(<http://Dispatchers.IO|Dispatchers.IO>) { ... } }
So, by adding the dispatcher context to launch(), that is actually letting us have another parallel worker, unlike the plain
launch {
version?
I feel like there's some nuance here, so I'm just keen to understand it
e

ephemient

11/15/2023, 12:58 AM
no
launch(context) { ... }
is equivalent to
launch { withContext(context) { ... } }
if you have a function
f()
which blocks the thread, and you are currently executing in a single-threaded dispatcher (such as
runBlocking
), then both
f()
and
launch { f() }
will prevent that single thread from making progress
but
withContext(<http://Dispatchers.IO|Dispatchers.IO>) { f() }
will use a built-in threadpool
t

Toby

11/15/2023, 1:01 AM
ah, got it! However, that adds a bit of confusion -- because in my example, I didn't think there was anything that was actually hard-blocking like that. as in,
delay()
and
channel.send()
are both suspend functions, yeah?
e

ephemient

11/15/2023, 1:02 AM
they are
I assumed that was a cut-down example
t

Toby

11/15/2023, 1:04 AM
It's slightly cut down, but it does demonstrate my issue -- that the two loops don't seem to run concurrently, even though both are suspend functions and not hard blocking
e

ephemient

11/15/2023, 1:06 AM
maybe it only looked stuck because your program was never terminating (due to the
CoroutineScope(...).launch {}
)?
https://pl.kotl.in/Mzszu6nPI shows that even
runBlocking
works, without
IO
t

Toby

11/15/2023, 1:08 AM
Maybe, but I thought i'd see i/o output while it was running, even if it never terminated?
e

ephemient

11/15/2023, 1:08 AM
might be buffered by whatever's running your program (such as Gradle)
t

Toby

11/15/2023, 1:10 AM
it's unbuffered, if I use the version with Dispatchers.IO
I'll play around for a bit and see if there is something in that concept though -- that it's stuck waiting for something to terminate
Thanks for your help so far -- appreciate your time explaining this
e

ephemient

11/15/2023, 1:13 AM
sure. it does seem that everybody comes into this with a different intuition at first, which doesn't always match how kotlinx.coroutines was designed to behave
t

Toby

11/15/2023, 1:15 AM
I know all the ins and outs and oddities of Akka Actors. 😂
Coming from Scala's concurrency models, it does take a bit of a shift to get used to Kotlin's
e

ephemient

11/15/2023, 1:18 AM
from what I know, it's using the same model as Java, so unstructured concurrency - anything goes, anywhere
but even Java is coming around to realize that structured concurrency is necessary for being able to reason sanely about large systems
https://openjdk.org/jeps/428 I haven't seen anything for Scala though
t

Toby

11/15/2023, 1:26 AM
Scala's Akka's Actors were a bit like coroutines and channels.. All the Actors ran in the same single thread, unless you specifically told them to run in another context. Actors were meant to be non-blocking. Actors would process a message off their incoming queue (like a kotlin channel) and send results out in the same way.
A supervisor would be watching all the incoming letterboxes for Actors, and choose which one would get to run to pick up a message to process it.
Actors were very lightweight to create and have exist, like coroutines.
e

ephemient

11/15/2023, 1:29 AM
I mean in that an actor passing work to other actors doesn't retain any chain of ownership (afaik)
t

Toby

11/15/2023, 1:30 AM
An Actor can create child actors, and those work like structured concurrency, where if the parent actor exits/dies, the children are automatically cleaned up.
(And the parent receives notifications about deaths of its children)
e

ephemient

11/15/2023, 1:33 AM
I feel like it's hard to explain this - you can implement actors on coroutines, but the other way around doesn't make much sense
t

Toby

11/15/2023, 1:33 AM
It's OK, I'm more mentioning this in case you were curious about where my biases come from 🙂
I'd like to write idiomatic Kotlin coroutines, rather than just implement Scala style on them
For what it's worth.. I have the simple example behaving properly, with just the
coroutineScope { launch { ... } }
methods now, but my large system still hangs unless I use
CoroutineScope(Dispatchers.Default).launch { .. }
It's interesting that it's fine with the Default dispatcher though; doesn't need the threading that comes from the IO one.
e

ephemient

11/15/2023, 2:16 AM
Dispatchers.Default
and
<http://Dispatchers.IO|Dispatchers.IO>
actually share threads, so that "switching" from one dispatcher to the other can be optimized
t

Toby

11/15/2023, 2:16 AM
The issue does seem to be that the
launch {}
wants to wait for the routine inside it to complete, yet in my system, that's an infinite loop (until cancelled)
j

Jacob

11/15/2023, 2:16 AM
The default dispatcher has threads too. It just usually has much less. A common setup is 4 and 64
t

Toby

11/15/2023, 2:16 AM
Ah.. right.
My child (like the inner() function in my example) is basically just a
delay; channel.send()
like my example. But I never see the code get past the launch {} that kicks it off. Which makes sense from the structured concurrency thing mentioned earlier I guess. But I guess I don't know what the right way is to have a properly asynchronous child coroutine?
j

Jacob

11/15/2023, 2:26 AM
Launch is the right way as in the previous example
t

Toby

11/15/2023, 2:43 AM
What is the significance of having coroutineScope twice, in the snippet there?
e

ephemient

11/15/2023, 2:45 AM
it introduces a child scope (just like
withContext(EmptyCoroutineContext)
), and is the way you are supposed to use to make use of
CoroutineScope.*
extensions inside a
suspend fun
t

Toby

11/15/2023, 2:46 AM
So I think I've worked out what is different about my simple example vs my example code I pasted.. and it's that in the example,
launch { inner() }
is called in outer(). Whereas in my real code, it was the equivalent of:
Copy code
fun outer {
  launchInnerThing()
  doStuff...
}
fun launchInnerThing() {
  coroutineScope { launch {
    while (true) { // do stuff }
  } }
}
e

ephemient

11/15/2023, 2:47 AM
yes, as we said previously,
coroutineScope { launch { ... } }
does not return until the body of the launch is complete
t

Toby

11/15/2023, 2:49 AM
That's why I was checking about the significance of
coroutineScope
occurring twice in your own example.. I hadn't quite made the connection
j

Jacob

11/15/2023, 2:50 AM
but you can redefine launchInnerThing as
fun CoroutineScope.launchInnerThing()
and lose the inner coroutineScope builder (and DON’T mark it as suspend!)
e

ephemient

11/15/2023, 2:51 AM
you can but I don't think that conveys the intent very well and makes it harder for the caller to actually know what the bounds are
in general, just like non-suspend functions, a suspend function should not silently continue to do things after it has returned. I'd rather have a
suspend fun runInner()
that callers will
launch {}
if they want to perform other actions concurrently with
to Jacob's point though: the convention in kotlinx.coroutines is
suspend fun
XOR
CoroutineScope
receiver, never both. https://elizarov.medium.com/structured-concurrency-722d765aa952
j

Jacob

11/15/2023, 2:54 AM
I’d reverse the phrasing of that: in general, just like suspend functions, a non-suspend function should… but otherwise 👍
t

Toby

11/15/2023, 2:57 AM
I agree -- it's not good behaviour for a function to just decide to keep running in the background. My bad for writing it! I am just experimenting around, and hadn't realised the significant of suspend vs non-suspend functions vs coroutine scope.
I'm learning a lot from this discussion though
d

Daniel Pitts

11/15/2023, 3:17 AM
You might actually be better off with a Flow rather than a Channel.
l

louiscad

11/15/2023, 3:44 AM
CoroutineScope(…).launch { }
can lead to silent malfunction if any underlying callback happens to be registered in a WeakReference (something you might not know): https://github.com/Kotlin/kotlinx.coroutines/issues/1061
t

Toby

11/15/2023, 4:04 AM
Thanks for the tip
j

Joffrey

11/15/2023, 8:55 AM
Sorry Toby I was sleeping before answering your question but I guess you got a good conversation there already. One thing I haven't seen clearly mentioned, and which might be the source of your confusion, was that the following does express concurrency like you want to:
Copy code
coroutineScope {
    launch {
        doStuff1()
    }
    doStuff2()
}
In this case
doStuff1
and
doStuff2
run concurrently, because
coroutineScope
only waits for the launch at the end of the block. That's how you construct hierarchies of concurrent coroutines. Ephemient had mentioned it but I'm not sure it was clear for you.
t

Toby

11/15/2023, 9:51 AM
I think the root cause of my confusion is the way that the placement of coroutineScope and launch matters, depending on whether it's inside a separate suspend function. (At least, that was one of the things I've taken away from the conversation. And need to experiment with further to truly understand.)
j

Joffrey

11/15/2023, 10:12 AM
Mmmh it doesn't matter whether you extract
coroutineScope
into a suspend function (I mean it doesn't change the behaviour). It is itself a suspend function. What matters is the placement of code inside or outside the
launch
or the
coroutineScope
block
t

Toby

11/15/2023, 10:52 AM
I think I am, slowly, starting to get it. So, a
coroutineScope { ... }
is essentially a blocker, that won't return until everything inside it has completed -- including if what it contains are
launch {...}
statements. If you want two coroutineScopes to be running in parallel, they need to be inside
launch
statements at a higher scope. ie
Copy code
runBlocking {
    launch {
      coroutineScope {
        launch { verySlowThing() }
        doAnotherRelatedThing()
      }
    }
    launch {
      coroutineScope {
        launch { oneThing }
        launch { otherRelatedThing() }
      }
   }
}
And in fact, I don't need the
coroutineScope
inside
launch
if I'm only doing one thing in there.
This leads me to another question though.. I can't just call
launch {
inside a regular suspend function -- I need to have the CoroutineScope available. This kinda forces me to use
suspend fun FooBar() = coroutineScope {
though, which then has the effect of requiring everything inside the function to complete before the whole function returns -- including things I launched. I'm beginning to understand how this is intentional now, but also leads me to wonder: Is there a way to get access to
launch
without forcing me to create a new coroutineScope?
I get the feeling this might be tricky to do by design, to really encourage the structured concurrency thing. I'm fine if that is the case, but I thought I should check.
l

louiscad

11/15/2023, 10:57 AM
Yes, but make sure there's still a strong reference to this
CoroutineScope(…)
, or the result of the only
launch
result (its
Job
), or you can risk the GC coming for it. And mind how Exception handling and cancellation propagation isn't going to work the same.
Cancellation and error handling basically need to be handled beyond the scope of the function when you're not using a local
coroutineScope { … }
, so you need to think more about it to avoid having uncaught exceptions that crash your program, or coroutines that keep running when they should have been cancelled.
t

Toby

11/15/2023, 11:03 AM
Thanks. I hope that's an area I have much more familiarity with -- ie. I'm familiar with managing worker threads, which sound like they have the same issues of error and cancellation management
j

Jacob

11/15/2023, 1:11 PM
but also leads me to wonder:
Is there a way to get access to launch without forcing me to create a new coroutineScope?
Yes, as I mentioned earlier, you can make your function an extension function on CoroutineScope and not suspend. Also see https://elizarov.medium.com/coroutine-context-and-scope-c8b255d59055
If you need to launch a coroutine that keeps running after your function returns, then make your function an extension of CoroutineScope or pass scope: CoroutineScope as parameter to make your intent clear in your function signature. Do not make these functions suspending:
👀 1
Also a reminder: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/coroutine-scope.html and https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope.html are very different. The latter can all sorts of issues that @louiscad is referring to and should only be used when the created scope will be managed by something else with a managed lifecycle as described with activities here https://kotlinlang.org/docs/coroutine-context-and-dispatchers.html#coroutine-scope