hey folks, I have what is probably a naive questio...
# coroutines
t
hey folks, I have what is probably a naive question, while I'm learning about Kotlin's coroutines. I was confused by the behaviour of switching between two different ways of launching a sub-coroutine, and wanted to make sure I understood the consequences properly. My scenario is that I have a long-running coroutine listening to a channel, and acting on it. eg.
for (msg in channel) { ... }
This initial coroutine was created via
CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch { .. }
Before entering that loop, I launch another coroutine, which is regularly send()ing messages into the channel. eg.
while (true) { delay(..); channel.send(..) }
Initially I was creating the second coroutine with
coroutineScope { launch { ... } }
, but later switched to
CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch { .. }
. I switched because in the first version, it felt like this coroutine was able to block the other one! If the _channel capacity was exceeded, this second coroutine starts to block on the channel.send() call, which is expected. However, it also seemed to be blocking my first coroutine too, which I did not expect. Can you help me understand what was happening here? And, also, I'm concerned about whether my second coroutine will still be properly considered a child of the first co-routine, with regard to it being properly cancelled if the parent is cancelled.
j
coroutineScope { launch { ... } }
is the same as
...
j
I'm finding this hard to follow without seeing the actual code. Note that with CoroutineScope().launch() you usually need to capture the return value of CoroutineScope() so that you can clean it up if needed. Unlike coroutineScope{launch{}} where structured concurrency will handle most things for you
t
Jeffrey: Oh! Right.. that's interesting. So it doesn't create a new
Job
that can run in parallel if you
launch
inside your own coro scope?
e
when
coroutineScope { ... }
(or any other normal
suspend fun
) returns, everything it has launched has also completed. this is the structure in structured concurrency
launch
isn't such a function; it's an extension on
CoroutineScope
, indicating that it launches a coroutine in there, not bounded by the function's own lifetime
do not use
CoroutineScope(...).launch { }
. that results in breaking the parent-child job relations
☝️ 1
☝🏾 1
t
I had a feeling that was going to be the case (breaking the parent-child relationship). But I'm not quite sure of the right way to get an asynchronous job running, then, without blocking the parent? (Which is perhaps happening due to a side effect of something else..)
e
Copy code
coroutineScope {
    launch { one() }
    coroutineScope {
        launch { two() }
        launch { three() }
    }
    launch { four() }
}
one+two+three may run concurrently. one+four may run concurrently. but two+three will never run concurrently with four; they must be complete before control flows to where four is launched. (regardless of whether you split out some parts of this into other functions or not)
t
That does sound like what I'm seeing.
Jacob asked for an example, so here's some code:
Copy code
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking

suspend fun outer(channel: Channel<String>) {
    //    coroutineScope { launch {
     CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
        inner(channel)
    }

    for (message in channel) {
        println("Hello ${message}")
    }
}

suspend fun inner(channel: Channel<String>) {
    for (i in 1..10) {
        delay(1000)
        channel.send("inner coro")
    }
    channel.close()
}

runBlocking {
    val chan = Channel<String>(2)
    CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
        outer(chan)
    }
    delay(11000)
}
If I shouldn't be using the
CoroutineScope.launch
method of creating another coroutine, then do you mind helping me understand the correct method to use?
(or feel free to redirect me to a good bit of docs.. but I was staring at the Kotlin coroutines docs for hours already without quite making this come together in my mind)
j
Copy code
suspend fun inner(channel: Channel<String>) {
    for (i in 1..10) {
        delay(1000)
        channel.send("inner coro")
    }
    channel.close()
}

runBlocking {
    val channel = Channel<String>(2)
    launch {
        inner(channel)
    }

    for (s in channel) {
        println("Hello $s")
    }
}
doesn’t work?
e
the immediate way to get rid of
CoroutineScope()
is
Copy code
suspend fun outer(channel: Channel<String>): Unit = coroutineScope {
    launch(<http://Dispatchers.IO|Dispatchers.IO>) {
        inner(channel)
    }

    for (message in channel) {
        println("Hello ${message}")
    }
}

runBlocking {
    val chan = Channel<String>(2)
    withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
        withTimeout(11000) {
            outer(chan)
        }
    }
}
and everything should run concurrently as expected
t
oh, for the sake of the example, I'm specifically trying to launch the inner routine from the outer routine. Imagine that from a caller's perspective, it only wants to know about the
outer()
function call.
(in reply to Jacob)
e
nothing about
outer
leaks past its lifetime to its caller - such as whether it `launch`es child coroutines
t
Thanks.. just digesting your changes, ephemient
OK, that mostly makes sense to me. In the (non-working) version of my example, I had:
coroutineScope { launch { ... } }
and that changes to this in your code:
coroutineScope { launch(<http://Dispatchers.IO|Dispatchers.IO>) { ... } }
So, by adding the dispatcher context to launch(), that is actually letting us have another parallel worker, unlike the plain
launch {
version?
I feel like there's some nuance here, so I'm just keen to understand it
e
no
launch(context) { ... }
is equivalent to
launch { withContext(context) { ... } }
if you have a function
f()
which blocks the thread, and you are currently executing in a single-threaded dispatcher (such as
runBlocking
), then both
f()
and
launch { f() }
will prevent that single thread from making progress
but
withContext(<http://Dispatchers.IO|Dispatchers.IO>) { f() }
will use a built-in threadpool
t
ah, got it! However, that adds a bit of confusion -- because in my example, I didn't think there was anything that was actually hard-blocking like that. as in,
delay()
and
channel.send()
are both suspend functions, yeah?
e
they are
I assumed that was a cut-down example
t
It's slightly cut down, but it does demonstrate my issue -- that the two loops don't seem to run concurrently, even though both are suspend functions and not hard blocking
e
maybe it only looked stuck because your program was never terminating (due to the
CoroutineScope(...).launch {}
)?
https://pl.kotl.in/Mzszu6nPI shows that even
runBlocking
works, without
IO
t
Maybe, but I thought i'd see i/o output while it was running, even if it never terminated?
e
might be buffered by whatever's running your program (such as Gradle)
t
it's unbuffered, if I use the version with Dispatchers.IO
I'll play around for a bit and see if there is something in that concept though -- that it's stuck waiting for something to terminate
Thanks for your help so far -- appreciate your time explaining this
e
sure. it does seem that everybody comes into this with a different intuition at first, which doesn't always match how kotlinx.coroutines was designed to behave
t
I know all the ins and outs and oddities of Akka Actors. 😂
Coming from Scala's concurrency models, it does take a bit of a shift to get used to Kotlin's
e
from what I know, it's using the same model as Java, so unstructured concurrency - anything goes, anywhere
but even Java is coming around to realize that structured concurrency is necessary for being able to reason sanely about large systems
https://openjdk.org/jeps/428 I haven't seen anything for Scala though
t
Scala's Akka's Actors were a bit like coroutines and channels.. All the Actors ran in the same single thread, unless you specifically told them to run in another context. Actors were meant to be non-blocking. Actors would process a message off their incoming queue (like a kotlin channel) and send results out in the same way.
A supervisor would be watching all the incoming letterboxes for Actors, and choose which one would get to run to pick up a message to process it.
Actors were very lightweight to create and have exist, like coroutines.
e
I mean in that an actor passing work to other actors doesn't retain any chain of ownership (afaik)
t
An Actor can create child actors, and those work like structured concurrency, where if the parent actor exits/dies, the children are automatically cleaned up.
(And the parent receives notifications about deaths of its children)
e
I feel like it's hard to explain this - you can implement actors on coroutines, but the other way around doesn't make much sense
t
It's OK, I'm more mentioning this in case you were curious about where my biases come from 🙂
I'd like to write idiomatic Kotlin coroutines, rather than just implement Scala style on them
For what it's worth.. I have the simple example behaving properly, with just the
coroutineScope { launch { ... } }
methods now, but my large system still hangs unless I use
CoroutineScope(Dispatchers.Default).launch { .. }
It's interesting that it's fine with the Default dispatcher though; doesn't need the threading that comes from the IO one.
e
Dispatchers.Default
and
<http://Dispatchers.IO|Dispatchers.IO>
actually share threads, so that "switching" from one dispatcher to the other can be optimized
t
The issue does seem to be that the
launch {}
wants to wait for the routine inside it to complete, yet in my system, that's an infinite loop (until cancelled)
j
The default dispatcher has threads too. It just usually has much less. A common setup is 4 and 64
t
Ah.. right.
My child (like the inner() function in my example) is basically just a
delay; channel.send()
like my example. But I never see the code get past the launch {} that kicks it off. Which makes sense from the structured concurrency thing mentioned earlier I guess. But I guess I don't know what the right way is to have a properly asynchronous child coroutine?
j
Launch is the right way as in the previous example
t
What is the significance of having coroutineScope twice, in the snippet there?
e
it introduces a child scope (just like
withContext(EmptyCoroutineContext)
), and is the way you are supposed to use to make use of
CoroutineScope.*
extensions inside a
suspend fun
t
So I think I've worked out what is different about my simple example vs my example code I pasted.. and it's that in the example,
launch { inner() }
is called in outer(). Whereas in my real code, it was the equivalent of:
Copy code
fun outer {
  launchInnerThing()
  doStuff...
}
fun launchInnerThing() {
  coroutineScope { launch {
    while (true) { // do stuff }
  } }
}
e
yes, as we said previously,
coroutineScope { launch { ... } }
does not return until the body of the launch is complete
t
That's why I was checking about the significance of
coroutineScope
occurring twice in your own example.. I hadn't quite made the connection
j
but you can redefine launchInnerThing as
fun CoroutineScope.launchInnerThing()
and lose the inner coroutineScope builder (and DON’T mark it as suspend!)
e
you can but I don't think that conveys the intent very well and makes it harder for the caller to actually know what the bounds are
in general, just like non-suspend functions, a suspend function should not silently continue to do things after it has returned. I'd rather have a
suspend fun runInner()
that callers will
launch {}
if they want to perform other actions concurrently with
to Jacob's point though: the convention in kotlinx.coroutines is
suspend fun
XOR
CoroutineScope
receiver, never both. https://elizarov.medium.com/structured-concurrency-722d765aa952
j
I’d reverse the phrasing of that: in general, just like suspend functions, a non-suspend function should… but otherwise 👍
t
I agree -- it's not good behaviour for a function to just decide to keep running in the background. My bad for writing it! I am just experimenting around, and hadn't realised the significant of suspend vs non-suspend functions vs coroutine scope.
I'm learning a lot from this discussion though
d
You might actually be better off with a Flow rather than a Channel.
l
CoroutineScope(…).launch { }
can lead to silent malfunction if any underlying callback happens to be registered in a WeakReference (something you might not know): https://github.com/Kotlin/kotlinx.coroutines/issues/1061
t
Thanks for the tip
j
Sorry Toby I was sleeping before answering your question but I guess you got a good conversation there already. One thing I haven't seen clearly mentioned, and which might be the source of your confusion, was that the following does express concurrency like you want to:
Copy code
coroutineScope {
    launch {
        doStuff1()
    }
    doStuff2()
}
In this case
doStuff1
and
doStuff2
run concurrently, because
coroutineScope
only waits for the launch at the end of the block. That's how you construct hierarchies of concurrent coroutines. Ephemient had mentioned it but I'm not sure it was clear for you.
t
I think the root cause of my confusion is the way that the placement of coroutineScope and launch matters, depending on whether it's inside a separate suspend function. (At least, that was one of the things I've taken away from the conversation. And need to experiment with further to truly understand.)
j
Mmmh it doesn't matter whether you extract
coroutineScope
into a suspend function (I mean it doesn't change the behaviour). It is itself a suspend function. What matters is the placement of code inside or outside the
launch
or the
coroutineScope
block
t
I think I am, slowly, starting to get it. So, a
coroutineScope { ... }
is essentially a blocker, that won't return until everything inside it has completed -- including if what it contains are
launch {...}
statements. If you want two coroutineScopes to be running in parallel, they need to be inside
launch
statements at a higher scope. ie
Copy code
runBlocking {
    launch {
      coroutineScope {
        launch { verySlowThing() }
        doAnotherRelatedThing()
      }
    }
    launch {
      coroutineScope {
        launch { oneThing }
        launch { otherRelatedThing() }
      }
   }
}
And in fact, I don't need the
coroutineScope
inside
launch
if I'm only doing one thing in there.
This leads me to another question though.. I can't just call
launch {
inside a regular suspend function -- I need to have the CoroutineScope available. This kinda forces me to use
suspend fun FooBar() = coroutineScope {
though, which then has the effect of requiring everything inside the function to complete before the whole function returns -- including things I launched. I'm beginning to understand how this is intentional now, but also leads me to wonder: Is there a way to get access to
launch
without forcing me to create a new coroutineScope?
I get the feeling this might be tricky to do by design, to really encourage the structured concurrency thing. I'm fine if that is the case, but I thought I should check.
l
Yes, but make sure there's still a strong reference to this
CoroutineScope(…)
, or the result of the only
launch
result (its
Job
), or you can risk the GC coming for it. And mind how Exception handling and cancellation propagation isn't going to work the same.
Cancellation and error handling basically need to be handled beyond the scope of the function when you're not using a local
coroutineScope { … }
, so you need to think more about it to avoid having uncaught exceptions that crash your program, or coroutines that keep running when they should have been cancelled.
t
Thanks. I hope that's an area I have much more familiarity with -- ie. I'm familiar with managing worker threads, which sound like they have the same issues of error and cancellation management
j
but also leads me to wonder:
Is there a way to get access to launch without forcing me to create a new coroutineScope?
Yes, as I mentioned earlier, you can make your function an extension function on CoroutineScope and not suspend. Also see https://elizarov.medium.com/coroutine-context-and-scope-c8b255d59055
If you need to launch a coroutine that keeps running after your function returns, then make your function an extension of CoroutineScope or pass scope: CoroutineScope as parameter to make your intent clear in your function signature. Do not make these functions suspending:
👀 1
Also a reminder: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/coroutine-scope.html and https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope.html are very different. The latter can all sorts of issues that @louiscad is referring to and should only be used when the created scope will be managed by something else with a managed lifecycle as described with activities here https://kotlinlang.org/docs/coroutine-context-and-dispatchers.html#coroutine-scope