https://kotlinlang.org logo
#coroutines
Title
# coroutines
c

Czar

12/14/2019, 4:08 PM
Hi, I'm not very familiar with coroutines yet, but I'm doing a code review on code that uses them. I'm trying to figure out something:
someList.map { async { /* stuff */ }.forEach { it.await() }
My immediate reaction is to refactor it to
someList.map { async { /* stuff */ }.awaitAll()
But I see that
awaitAll
allocates an array, so am I actually hurting performance instead of improving things? Is
forEach { it.await() }
a more correct way to do
await
on a collection of
Deferred
?
a

Adam Powell

12/14/2019, 4:14 PM
if you're not going to store the results, why use async instead of launch?
c

Czar

12/14/2019, 4:15 PM
Huh, just occurred to me, performance aside, those two will fail differently if any of the `await`s fail. One will compute everything until it sequentially gets to the failing one, the other will fail as soon as the failure happens. @Adam Powell As I've said, I'm not familiar with coroutines, just reviewing someone else's code. Thanks for the pointer I'll look into
launch
, too.
a

Adam Powell

12/14/2019, 4:16 PM
plus the coroutines machinery is going to be doing a number of allocations along the way, if the work that you're trying to do concurrently isn't going to eclipse those costs and the cost of allocating a collection to hold the results you might as well do it without coroutines
k

Kevin Gorham

12/14/2019, 4:17 PM
exactly. The IDE will even warn you anytime you immediately await on an async--it can be reduced to just doing a
withContext
instead because you're not really awaiting.
c

Czar

12/14/2019, 4:19 PM
ok, the real code looks like this:
Copy code
@Transactional
fun processEvents() = runBlocking {
  eventRepository.listNextBatch()
    .map { async { processInNewTransaction(it) } }
    .forEach { it.await() }
}
maybe sequential await and async vs launch here is tied to some requirement I'm not aware of? But so far looks, like events are not related to each other and launch should work.
a

Adam Powell

12/14/2019, 4:22 PM
at a minimum I'd ask why this code is using async/await instead of launch/join, since the result of the await is being ignored
k

Kevin Gorham

12/14/2019, 4:22 PM
that
runBlocking
is also a smell. That's mostly intended for tests. The problems here seem a bit more fundamental--like the type that require a refactor.
a

Adam Powell

12/14/2019, 4:24 PM
also +1 to what @Kevin Gorham said; the
runBlocking
is highly suspicious; I'd expect it to be
suspend fun processEvents() = coroutineScope {
instead, so that the caller decides where it runs and how it's dispatched. As-is this is going to hard block a thread until suspending operations complete and that's rarely what you want.
c

Czar

12/14/2019, 4:25 PM
I see. Ok, I'll set aside some time to dive deeper into coroutines and schedule a re-review for the future. For now at least it seems that the code is fulfilling business requirements (tests pass and manual QA is happy) 🙂 So I'll leave this be and set a todo for later. Thanks for useful pointers.
👍 1
a

Adam Powell

12/14/2019, 4:28 PM
also useful to know is that when you
launch
(or
async
) into a surrounding coroutine scope like this code is doing (or like replacing
runBlocking {}
with
coroutineScope {}
would do) all child jobs must
.join()
before the scoping block will return. So explicitly joining the child jobs isn't even necessary, it'll be done automatically.
The
processInNewTransaction
function is marked
suspend
, right?
c

Czar

12/14/2019, 4:29 PM
speaking of caller, the method I showed is invoked like this in Spring app:
Copy code
@Scheduled(fixedDelay = EVENT_PROCESSING_BATCH_DELAY)
fun processEvents() {
  eventProcessor.processEvents()
}
@Adam Powell actually it is not
a

Adam Powell

12/14/2019, 4:30 PM
then what you have there is a very expensive
.forEach
🙂
c

Czar

12/14/2019, 4:31 PM
😄
a

Adam Powell

12/14/2019, 4:31 PM
because a
runBlocking
is going to run all of those asyncs on the same (calling) thread, and each one is going to block before it allows the next one to run
c

Czar

12/14/2019, 4:32 PM
are you sure?
k

Kevin Gorham

12/14/2019, 4:33 PM
Adam is right and I'd also be worried about any similar code this developer wrote that uses the
eventRepository
a

Adam Powell

12/14/2019, 4:33 PM
if
processInNewTransaction
was internally doing work that it could suspend during, you might see some benefits here, but if it's not farming off work into some sort of thread pooled dispatcher and awaiting the result itself, and the caller isn't doing a
withContext(SomeDispatcherBackedByAThreadPool)
to launch/async that work in parallel, then yeah, this is an expensive blocking loop running on the single calling thread.
Any time you see
runBlocking
, read it as, "run an event loop that concurrent jobs will post their continuation runnables to" - it allows for concurrency, not parallelism.
k

Kevin Gorham

12/14/2019, 4:36 PM
this developer seems to think that just by putting something inside an async block, it becomes suspending. That is sometimes what
async/await
means in other languages but that's not entirely how it works with coroutines.
c

Czar

12/14/2019, 4:37 PM
haha, you're right
a

Adam Powell

12/14/2019, 4:37 PM
well, in this case they're really only a
runBlocking(Dispatchers.Default)
parameter away from getting that sort of behavior
c

Czar

12/14/2019, 4:37 PM
Thanks!
a

Adam Powell

12/14/2019, 4:38 PM
but since that will start actually running the code in parallel, and this code already demonstrates some other misunderstandings, I'd be concerned that
processInTransaction
isn't actually thread-safe and might start breaking in the presence of actual parallelism
c

Czar

12/14/2019, 4:39 PM
processInTransaction is thread-safe, that I did check.
👍 1
a

Adam Powell

12/14/2019, 4:39 PM
hopefully not thread-safe by simply wrapping the whole logic in a
synchronized
block, then you'd be back to expensive serial execution again 😄
c

Czar

12/14/2019, 4:40 PM
no 🙂
👍 1
j

Joris PZ

12/14/2019, 6:13 PM
Well, it's actually a lifesaver that the current solution runs single-threaded because if that @`Transactional` is a Spring annotation, wouldn't it store the JDBC connection as a ThreadLocal? Which would then be not there anytime the scheduler would run one of those `async`s on a separate thread?
Or does
processInNewTransaction
actually start a new transaction as the name implies? Which would still be problematic if the coroutine ever gets suspended. And what's the point of
@Transactional
then? These few lines are quite fascinating! :-)
c

Czar

12/14/2019, 7:01 PM
that's as intended. aside from the async block, there's stuff in that method that should be taken care of in the parent transaction (I omitted those, as they weren't important to my question), while each async invocation should and is creating its own isolated transaction.
Regarding the
processInNewTransaction
, it constructs a TransactionTemplate and executes some logic via template's
execute
Yeah, fascinating indeed 🙂 I guess moral is one shouldn't use shiny new technologies without understanding how they do what they do or at least how they should be used for a specific case 🙂 I think the dev should've just moved the
processInNewTransaction
method to another component and declared it
@Async
🙂 but to re-iterate, coroutines are so new and shiny 🤩
14 Views