Andrea Giuliano
08/26/2020, 2:58 PMsuspend fun doSomething() = coroutineScope {
val channel: Channel<Int> = Channel(capacity)
val results = aggregator.aggregate(channel)
channel.close()
results
}
where aggregator
is a class that exposes a suspending function aggregate
that uses the channel to listen to messages with channel.receive()
Now, would channel.close()
be ever invoked before the aggregator finishes to receive all the messages? Or since I’m not creating a new scope (by using launch or async for example) guarantees that each line of my snippet is executed one after the other finishes? And if that’s the case what’s the purpose of marking a function like aggregate()
with a suspend modifier (although it suspends since it will wait for mesages in the channel)?Zach Klippenstein (he/him) [MOD]
08/26/2020, 4:16 PMwould channel.close() be ever invoked before the aggregator finishes to receive all the messages?Not if
aggregate
is doing structured concurrency right and not leaking any coroutines. To which point, calling close
(or cancel
) on such a channel is unnecessary.
Or since I’m not creating a new scope (by using launch or async for example) guarantees that each line of my snippet is executed one after the other finishes?Yes. Although, again, if
aggregate
is being naughty and launching new coroutines that don’t follow scoping rules (e.g. using GlobalScope
), then those coroutines could be running after aggregate
returns, but if aggregate
is just looping over channel.receive()
, for instance, then it’s well-behaved and your assumption would be correct.
what’s the purpose of marking a function like aggregate() with a suspend modifier?You answered your own question: “it suspends since it will wait for mesages in the channel”. Another question however might be, why do suspend modifiers need to be propagated up the call chain if methods are only doing sequential things anyway? And the answer to that is because if you call a method that calls a method that suspends, everything after that innermost suspension point needs to be wrapped up into the continuation (the “callback”) that will be resumed when the suspend resumes. That requires every method up the chain to have the state machine infrastructure that the Kotlin compiler uses to actually implement continuations. And the only way the compiler knows to generate that state machine infra is by the suspend modifier on each of these methods. Another, less technical, answer is that it serves as a documentation hint to people reading this method signature that it (probably) won’t do any blocking work on the current thread, so it’s safe to call, e.g., from somewhere like the Android main thread.
Andrea Giuliano
08/26/2020, 4:28 PMNot ifDoes that mean that channels gets closed automatically when there is no reference to it? Also, when you say “doing structure concurrency right”, imagine that aggregator is part of a library and I don’t really know what it does, so it can do some launch() inside it. I believe in that case channel will be closed earlier in my snippet, is that right? So would you suggest to just not close the channel and leave it to be garbage collected?is doing structured concurrency right and not leaking any coroutines. To which point, callingaggregate
(orclose
) on such a channel is unnecessary.cancel
Zach Klippenstein (he/him) [MOD]
08/26/2020, 4:42 PMDoes that mean that channels gets closed automatically when there is no reference to it?No, just that there’s no need to close a channel if you know there aren’t any coroutines waiting for it to be closed. A channel isn’t a low-level resource like a file that’s managed by the OS and must always be closed. It’s just a queue, and if nobody’s reading from the queue anymore, it’s fine to just let it be garbage-collected without closing it.
imagine that aggregator is part of a library and I don’t really know what it doesIf it’s doing something bad, I would consider that a bug and file a bug report to the library. If the library really wants to, it can run as many coroutines as it wants that you have no control over, whether or not you close the channel.
channe.close()
there would probably be with a comment explaining that it’s working around some specific bug in the library, with a link to the bug report 😂 (or some other comment explaining why it’s actually necessary)Andrea Giuliano
08/26/2020, 4:46 PMZach Klippenstein (he/him) [MOD]
08/26/2020, 4:46 PMproduce
coroutine builder).Andrea Giuliano
08/26/2020, 4:47 PMZach Klippenstein (he/him) [MOD]
08/26/2020, 4:47 PMAndrea Giuliano
08/26/2020, 4:48 PMZach Klippenstein (he/him) [MOD]
08/26/2020, 4:48 PMAndrea Giuliano
08/26/2020, 4:49 PMaggregate()
method although I’m doing receive
so I thought it would have been nice to have the suspend modifier, the compiler didn’t force me to do so. in that case does it mean that the aggregate
function would have been called “blocking” and only the receive
would actually suspend. is that correct?Zach Klippenstein (he/him) [MOD]
08/26/2020, 4:54 PMdoSomething
needs the suspend modifier because you call coroutineScope
actually, not because of aggregate.aggregate
actually has a CoroutineScope
receiver, you don't need to call coroutineScope
at all.Andrea Giuliano
08/26/2020, 4:58 PMsuspend fun aggregate(channel: ReceiveChannel<T>):
it has suspend modifier because I call channel.receive()
Zach Klippenstein (he/him) [MOD]
08/26/2020, 5:00 PMcoroutineScope
at all.Andrea Giuliano
08/26/2020, 5:01 PMZach Klippenstein (he/him) [MOD]
08/26/2020, 5:03 PMcoroutineScope
if you need to call one of the coroutine builder functions (launch, async, etc) in the current scope.julian
08/28/2020, 7:46 PMcoroutineScope
in this thread https://kotlinlang.slack.com/archives/C1CFAFJSK/p1598198784089700