Czar
12/14/2019, 4:08 PMsomeList.map { async { /* stuff */ }.forEach { it.await() }
My immediate reaction is to refactor it to
someList.map { async { /* stuff */ }.awaitAll()
But I see that awaitAll
allocates an array, so am I actually hurting performance instead of improving things? Is forEach { it.await() }
a more correct way to do await
on a collection of Deferred
?Adam Powell
12/14/2019, 4:14 PMCzar
12/14/2019, 4:15 PMlaunch
, too.Adam Powell
12/14/2019, 4:16 PMKevin Gorham
12/14/2019, 4:17 PMwithContext
instead because you're not really awaiting.Czar
12/14/2019, 4:19 PM@Transactional
fun processEvents() = runBlocking {
eventRepository.listNextBatch()
.map { async { processInNewTransaction(it) } }
.forEach { it.await() }
}
Adam Powell
12/14/2019, 4:22 PMKevin Gorham
12/14/2019, 4:22 PMrunBlocking
is also a smell. That's mostly intended for tests. The problems here seem a bit more fundamental--like the type that require a refactor.Adam Powell
12/14/2019, 4:24 PMrunBlocking
is highly suspicious; I'd expect it to be suspend fun processEvents() = coroutineScope {
instead, so that the caller decides where it runs and how it's dispatched. As-is this is going to hard block a thread until suspending operations complete and that's rarely what you want.Czar
12/14/2019, 4:25 PMAdam Powell
12/14/2019, 4:28 PMlaunch
(or async
) into a surrounding coroutine scope like this code is doing (or like replacing runBlocking {}
with coroutineScope {}
would do) all child jobs must .join()
before the scoping block will return. So explicitly joining the child jobs isn't even necessary, it'll be done automatically.processInNewTransaction
function is marked suspend
, right?Czar
12/14/2019, 4:29 PM@Scheduled(fixedDelay = EVENT_PROCESSING_BATCH_DELAY)
fun processEvents() {
eventProcessor.processEvents()
}
Adam Powell
12/14/2019, 4:30 PM.forEach
🙂Czar
12/14/2019, 4:31 PMAdam Powell
12/14/2019, 4:31 PMrunBlocking
is going to run all of those asyncs on the same (calling) thread, and each one is going to block before it allows the next one to runCzar
12/14/2019, 4:32 PMKevin Gorham
12/14/2019, 4:33 PMeventRepository
Adam Powell
12/14/2019, 4:33 PMprocessInNewTransaction
was internally doing work that it could suspend during, you might see some benefits here, but if it's not farming off work into some sort of thread pooled dispatcher and awaiting the result itself, and the caller isn't doing a withContext(SomeDispatcherBackedByAThreadPool)
to launch/async that work in parallel, then yeah, this is an expensive blocking loop running on the single calling thread.runBlocking
, read it as, "run an event loop that concurrent jobs will post their continuation runnables to" - it allows for concurrency, not parallelism.Kevin Gorham
12/14/2019, 4:36 PMasync/await
means in other languages but that's not entirely how it works with coroutines.Czar
12/14/2019, 4:37 PMAdam Powell
12/14/2019, 4:37 PMrunBlocking(Dispatchers.Default)
parameter away from getting that sort of behaviorCzar
12/14/2019, 4:37 PMAdam Powell
12/14/2019, 4:38 PMprocessInTransaction
isn't actually thread-safe and might start breaking in the presence of actual parallelismCzar
12/14/2019, 4:39 PMAdam Powell
12/14/2019, 4:39 PMsynchronized
block, then you'd be back to expensive serial execution again 😄Czar
12/14/2019, 4:40 PMJoris PZ
12/14/2019, 6:13 PMprocessInNewTransaction
actually start a new transaction as the name implies? Which would still be problematic if the coroutine ever gets suspended. And what's the point of @Transactional
then?
These few lines are quite fascinating! :-)Czar
12/14/2019, 7:01 PMprocessInNewTransaction
, it constructs a TransactionTemplate and executes some logic via template's execute
processInNewTransaction
method to another component and declared it @Async
🙂 but to re-iterate, coroutines are so new and shiny 🤩