What is the proper way to use Dispatchers and coro...
# coroutines
j
What is the proper way to use Dispatchers and corountine scope? I just fixed a problem where coroutines would get created but not run. In the code below 10000 elements each spawn a coroutine, it performs a DB call (blocking) and then spawns another coroutine for an api call (suspends). If they are both on
<http://Dispatcher.IO|Dispatcher.IO>
then
Launched
is printed to the console ~10000 times before a single
Got it
is printed. If one of the
<http://Dispatchers.IO|Dispatchers.IO>
is changed, then you see the two print statements intertwined.
Copy code
fun main() {
    runBlocking {
        launch {
            Script.run()
        }.join()
    }
}

object Script {
    suspend fun run() {
        val channel = Channel<Int>()
        CoroutineScope(Dispatchers.Default).launch {
            channel.consumeEach {
                println("Launched")
                launch(<http://Dispatchers.IO|Dispatchers.IO>) {
                    delay(100) // Suspending Api call
                    println("Got $it")
                }
            }
        }
        IntRange(1, 10000).map {
            CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).launch {
                Thread.sleep(100) // blocking jdbc call
                channel.send(it)
            }
        }.joinAll()
    }
}
j
There's a lot to unpack here
First,
Copy code
suspend fun main() {
    Script.run()
}
Next, never create orphaned `CoroutineScope`s. Your entire implementation of
run
can be wrapped in
coroutineScope { }
and then all your dispatcher usage moved to parameters on
launch
. This also lets you get rid of the
joinAll
as a coroutine scope cannot finish until all its children have.
launch
inside a
launch
is very suspicious. You probably want
withContext
to take the coroutine and move it to a new dispatcher
although, i guess you're doing that inside a
consumeEach
because you want them to all run in parallel? you don't actually need to move the coroutine collecting the channel to a different dispatcher (although it has no real effect other than slowing things down)
Finally, the reason you're seeing all of the launches before any "got it"s is that a dispatcher is a set of threads and a queue. You are enqueuing 10000 items and then as those items send data to the channel, you are appending 10000 more items to the queue. Thus, the first 10000 need to run before the second 10000 need to run. Both the blocking jdbc call and the suspending api call take 100 wall-clock milliseconds because Thread.sleep and delay on the IO dispatcher behave the same way, it delays execution of the following statements by 100 wall-clock milliseconds.
if you reduce the delay time you'll see them start to interleave
here's an example: https://pl.kotl.in/hVXAinZzh
Note that I kept
joinAll()
because I needed to defer closing the channel until after all of those coroutines finished. This in turn allows the channel consuming coroutine to complete.
j
What Jake said. Then a few other things.
IntRange(1,1000)
could be
(1..1000)
. But also
(1..1000).map { ... }
could just be
List(1000) { ... }
, or even just
repeat(1000)
if you didn't need to join all created jobs. And you don't: just use
coroutineScope
around it and it will wait for children. Then, the whole thing seems to be a manual implementation of a channel flow. You could use
channelFlow { ... }
Instead to launch your JDBC coroutines to emit to the flow, and then collect the flow
j
Lots of good information, thank you. Starting from the top…`suspend fun main()` makes a coroutine context,
suspend fun run() = coroutineScope {
wraps that context in a scope so we can now use coroutines. The scope is like a container and the coroutines it creates can be run on any dispatcher. I’m not totally clear on the difference between
CoroutineScope()
and
coroutineScope{}
.
coroutineScope
is a higher order function, so we just pass a lambda. When would you use
CoroutineScope(...)
? Orphaned coroutinescopes, don’t totally understand this. My example has 10000 + 1 or 2 orphans? I haven’t learned about flows yet, but it’s on the docket. Here is my implementation and why I have a launch inside a launch. https://pl.kotl.in/TxcoBsPL_ I’m dealing with a rate limited api. In the example above I have a coroutine that receives calls (real call is suspending api) via a channel, executes 5, waits 1 second, executes 5 more and so on.
j
The coroutines scope lambda function joins all created jobs before it returns creating what's known as structured concurrency. Errors will propagate upwards. Cancelation will propagate downwards. It forms a tree of jobs that is easy to reason about. The coroutine scope constructor creates a completely separate set of coroutines detached from the calling location. Exceptions don't bubble up normally. Cancelation does not affect these scopes. Your code does not wait for them to complete automatically.
j
Ok that makes sense. Would that approach be the same if you were running a server? Pseudocode(ish) eg
Copy code
controller = coroutineScope {
    var results = listOf()
    repeat(10) {
        results.add(async{callApi()})
    }
    results
}
Is there a name for the “feature” that allows the variable
corountineContext
to be accessible inside a
couroutineScope
? I’ve seen that pattern a lot, but I don’t know what it’s called or how to write something that could do it myself. In regards to the launch in a launch, It’s essentially an actor listening for new messages and then spawning a process. `withContext(Dispatchers.IO)`would stop it from processing new messages until it finished that one. I could pass the scope or context of the calling coroutine and then use that but I think there would still be a launch in a launch.