Hi, I'm not very familiar with coroutines yet, but...
# coroutines
c
Hi, I'm not very familiar with coroutines yet, but I'm doing a code review on code that uses them. I'm trying to figure out something:
someList.map { async { /* stuff */ }.forEach { it.await() }
My immediate reaction is to refactor it to
someList.map { async { /* stuff */ }.awaitAll()
But I see that
awaitAll
allocates an array, so am I actually hurting performance instead of improving things? Is
forEach { it.await() }
a more correct way to do
await
on a collection of
Deferred
?
a
if you're not going to store the results, why use async instead of launch?
c
Huh, just occurred to me, performance aside, those two will fail differently if any of the `await`s fail. One will compute everything until it sequentially gets to the failing one, the other will fail as soon as the failure happens. @Adam Powell As I've said, I'm not familiar with coroutines, just reviewing someone else's code. Thanks for the pointer I'll look into
launch
, too.
a
plus the coroutines machinery is going to be doing a number of allocations along the way, if the work that you're trying to do concurrently isn't going to eclipse those costs and the cost of allocating a collection to hold the results you might as well do it without coroutines
k
exactly. The IDE will even warn you anytime you immediately await on an async--it can be reduced to just doing a
withContext
instead because you're not really awaiting.
c
ok, the real code looks like this:
Copy code
@Transactional
fun processEvents() = runBlocking {
  eventRepository.listNextBatch()
    .map { async { processInNewTransaction(it) } }
    .forEach { it.await() }
}
maybe sequential await and async vs launch here is tied to some requirement I'm not aware of? But so far looks, like events are not related to each other and launch should work.
a
at a minimum I'd ask why this code is using async/await instead of launch/join, since the result of the await is being ignored
k
that
runBlocking
is also a smell. That's mostly intended for tests. The problems here seem a bit more fundamental--like the type that require a refactor.
a
also +1 to what @Kevin Gorham said; the
runBlocking
is highly suspicious; I'd expect it to be
suspend fun processEvents() = coroutineScope {
instead, so that the caller decides where it runs and how it's dispatched. As-is this is going to hard block a thread until suspending operations complete and that's rarely what you want.
c
I see. Ok, I'll set aside some time to dive deeper into coroutines and schedule a re-review for the future. For now at least it seems that the code is fulfilling business requirements (tests pass and manual QA is happy) πŸ™‚ So I'll leave this be and set a todo for later. Thanks for useful pointers.
πŸ‘ 1
a
also useful to know is that when you
launch
(or
async
) into a surrounding coroutine scope like this code is doing (or like replacing
runBlocking {}
with
coroutineScope {}
would do) all child jobs must
.join()
before the scoping block will return. So explicitly joining the child jobs isn't even necessary, it'll be done automatically.
The
processInNewTransaction
function is marked
suspend
, right?
c
speaking of caller, the method I showed is invoked like this in Spring app:
Copy code
@Scheduled(fixedDelay = EVENT_PROCESSING_BATCH_DELAY)
fun processEvents() {
  eventProcessor.processEvents()
}
@Adam Powell actually it is not
a
then what you have there is a very expensive
.forEach
πŸ™‚
c
πŸ˜„
a
because a
runBlocking
is going to run all of those asyncs on the same (calling) thread, and each one is going to block before it allows the next one to run
c
are you sure?
k
Adam is right and I'd also be worried about any similar code this developer wrote that uses the
eventRepository
a
if
processInNewTransaction
was internally doing work that it could suspend during, you might see some benefits here, but if it's not farming off work into some sort of thread pooled dispatcher and awaiting the result itself, and the caller isn't doing a
withContext(SomeDispatcherBackedByAThreadPool)
to launch/async that work in parallel, then yeah, this is an expensive blocking loop running on the single calling thread.
Any time you see
runBlocking
, read it as, "run an event loop that concurrent jobs will post their continuation runnables to" - it allows for concurrency, not parallelism.
k
this developer seems to think that just by putting something inside an async block, it becomes suspending. That is sometimes what
async/await
means in other languages but that's not entirely how it works with coroutines.
c
haha, you're right
a
well, in this case they're really only a
runBlocking(Dispatchers.Default)
parameter away from getting that sort of behavior
c
Thanks!
a
but since that will start actually running the code in parallel, and this code already demonstrates some other misunderstandings, I'd be concerned that
processInTransaction
isn't actually thread-safe and might start breaking in the presence of actual parallelism
c
processInTransaction is thread-safe, that I did check.
πŸ‘ 1
a
hopefully not thread-safe by simply wrapping the whole logic in a
synchronized
block, then you'd be back to expensive serial execution again πŸ˜„
c
no πŸ™‚
πŸ‘ 1
j
Well, it's actually a lifesaver that the current solution runs single-threaded because if that @`Transactional` is a Spring annotation, wouldn't it store the JDBC connection as a ThreadLocal? Which would then be not there anytime the scheduler would run one of those `async`s on a separate thread?
Or does
processInNewTransaction
actually start a new transaction as the name implies? Which would still be problematic if the coroutine ever gets suspended. And what's the point of
@Transactional
then? These few lines are quite fascinating! :-)
c
that's as intended. aside from the async block, there's stuff in that method that should be taken care of in the parent transaction (I omitted those, as they weren't important to my question), while each async invocation should and is creating its own isolated transaction.
Regarding the
processInNewTransaction
, it constructs a TransactionTemplate and executes some logic via template's
execute
Yeah, fascinating indeed πŸ™‚ I guess moral is one shouldn't use shiny new technologies without understanding how they do what they do or at least how they should be used for a specific case πŸ™‚ I think the dev should've just moved the
processInNewTransaction
method to another component and declared it
@Async
πŸ™‚ but to re-iterate, coroutines are so new and shiny 🀩