Hugo Costa
05/30/2024, 9:06 AMrunBlocking(Dispatchers.Default) {
(...)
activeKotlinRules.asFlow()
.mapNotNull {
// Get an instance
ruleRepository.getRule(RuleTag.valueOf(it.ruleName))
}.flatMapMerge { rule ->
requests.asFlow()
.flatMapMerge() { request ->
rule.validate(request, network)
}
}
}
All of the rules validate
function is suspended and mostly everything within it as well. Nonetheless, looking at the logs, it seems like we're getting stuck waiting for queries (using the Kotlin SDK btw) to happen. Am I not fully utilising the multithreaded/suspend capabilities of my code? I would expect that as soon as it hits the SDK code, it would let it run on the IO dispatcher and start another rule/request pair.
For reference, the SDK code is all something like (which I'm told from the Kotlin SDK team is not even that necessary, because internally, they automatically make all their ops go to the IO dispatcher
suspend fun executeQuery(query: String): String = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
val statementId = executeQuery(query)
waitForStatement(statementId)
return redshiftClient.getStatementResult(
GetStatementResultRequest { id = statementId },
)
}
Sam
05/30/2024, 9:08 AMwould expect that as soon as it hits the SDK code, it would let it run on the IO dispatcher and start another rule/request pair.This might be a misunderstanding. Suspending functions are sequential by default. Whether you switch dispatchers or not, a suspending function will always suspend to wait for all its work to complete. If you wanted concurrency, you'd need to introduce it explicitly by launching additional coroutines.
Sam
05/30/2024, 9:09 AMSam
05/30/2024, 9:10 AMexecuteQuery
function relate to the earlier code? is it called inside the ruleRepository
?Hugo Costa
05/30/2024, 9:44 AMSuspending functions are sequential by defaultSo even though it can be suspended, it doesn't necessarily mean it will? i.e. if I have a simple for loop with 2 items, it won't necessarily start work on the 2nd item until the first one is done?
Hugo Costa
05/30/2024, 9:45 AMexecuteQuery
is a function that is triggered by some of the rules, within the rule.validate(request, network)
callSam
05/30/2024, 9:49 AMif I have a simple for loop with 2 items, it won't necessarily start work on the 2nd item until the first one is done?Correct; it will always wait for the first one before continuing to the second one.
even though it can be suspended, it doesn't necessarily mean it will?Actually, "suspended" means "paused". If you call a suspending function inside a loop, then when it suspends, it just pauses the whole loop. That's why the loop does not continue until the suspending function is finished.
Hugo Costa
05/30/2024, 9:50 AMHugo Costa
05/30/2024, 9:51 AMsupervisorScope {
requests.forEach { request ->
launch {
try {
val validationRules = getActiveValidationRules()
validateRequest(request, validationRules)
} catch (e: Exception) {
// Handle the exception for the current request
// without affecting other requests
handleValidationError(request, e)
}
}
}
}
In this case, as I understand it, each rule/request would be its own job and would run/pause as necessary, right?Sam
05/30/2024, 9:52 AMlaunch
or async
.Sam
05/30/2024, 9:53 AMlaunch
returns immediately, letting the forEach
loop continue right away. Then, once the loop is done, the supervisorScope
is actually responsible for waiting for all the coroutines to finish.Hugo Costa
05/30/2024, 9:54 AMflatMapMerge
then is a bit confusing to me - it allows for concurrency but will still do things sequentially, without any possibility of running it in parallel?Sam
05/30/2024, 9:55 AMuli
05/30/2024, 9:55 AMasync
instead of launch
and after all `asyncs`are created, you can awaitAll
Hugo Costa
05/30/2024, 9:57 AM{
"timestamp": "2024-05-30T09:43:54.420Z",
"lineNumber": 287,
"class": "Rule#1",
"loggerName": "...",
"message": "...",
"level": "INFO",
"thread": "DefaultDispatcher-worker-4"
}
{
"timestamp": "2024-05-30T09:44:00.761Z",
"lineNumber": 153,
"class": "Rule#2",
"loggerName": "...",
"message": "...",
"level": "INFO",
"thread": "DefaultDispatcher-worker-8"
}
Looking at the logic, it appears there are specific workers for each rule, but they're all sequential as discussedHugo Costa
05/30/2024, 9:59 AMval validationResults = requests.map { request ->
async {
try {
val validationRules = getActiveValidationRules()
validateRequest(request, validationRules)
} catch (e: Exception) {
handleValidationError(request, e)
}
}
}
Hugo Costa
05/30/2024, 10:01 AMchunked
api that was just launched 🙂 )uli
05/30/2024, 10:09 AMfun CoroutineScope.validationResultFlow(
activeKotlinRules: List<Any>,
requests: List<Any>,
): Flow<Any> = callbackFlow {
activeKotlinRules.forEach {
// Get an instance
launch(Dispatchers.Default) {
val rule = ruleRepository.getRule(RuleTag.valueOf(it.ruleName))
requests.forEach { request ->
launch {
send(rule.validate(request, network))
} }
}
}
}
Sam
05/30/2024, 10:26 AMcallbackFlow
there 👍. There might be flow operators that would work, but this way seems much easier to understand. One note: there's no need for the CoroutineScope
receiver here, since the callbackFlow
has its own scope.Sam
05/30/2024, 10:31 AMchannelFlow
might be more appropriate than a callbackFlow
. They're basically the same, but a channelFlow
is designed to wait for its child coroutines whereas a callbackFlow
would end the flow at the end of the loop (which is not what's wanted here)Hugo Costa
05/30/2024, 10:33 AMlaunch(Dispatchers.Default)
for both right? Or since we're launching within a launch, it doesn't really matter?Sam
05/30/2024, 10:34 AMHugo Costa
05/30/2024, 10:35 AMfun invoke(request: ValidationLambdaInput): String = runBlocking(Dispatchers.Default) {
/.../
}
So in practice, I should be able to emit both right?Sam
05/30/2024, 10:38 AMvalidationResultFlow
example, if the flow is collected from a coroutine that's already using Dispatchers.Default
, then there's no need to specify any further dispatcher inside the function. It won't hurt either, though.Sam
05/30/2024, 10:39 AMHugo Costa
05/30/2024, 10:40 AMSam
05/30/2024, 10:42 AMGlobalScope.launch
will use Dispatchers.Default
. And so will CoroutineScope(EmptyCoroutineContext)
and suspend fun main() = coroutineScope { ... }
. What you're seeing is probably down to the fact that runBlocking
actually creates a scope that already comes with a non-default dispatcher. I agree, it can be a bit unexpected.uli
05/30/2024, 10:45 AMOften people will specify a dispatcher just for safety, if they’re not sure where their code will be called from. That’s called “main-safety”, since it ensures the code is safe to call from the main thread in UI apps.And you should. The rule is no coroutine will block a callers thread. So if something needs to run on default/io specifiy it. If you are already on io/default no dispatching should happen.
uli
05/30/2024, 10:50 AMfun invoke(request: ValidationLambdaInput): String = runBlocking(Dispatchers.Default) {
> /.../
> }
You mean you are collecting the flow within a runBlocking(Dispatchers.Default)
? Or you are calling something like my fun validationResultFlow
from the runBlocking?
None of those should require a runBlocking. Setting up the flow is String
that fun invoke
returns?
If you want to wait for all validation results, then maybe a flow is the wrong tool and we should go back to async
Hugo Costa
05/30/2024, 10:53 AMValidationLambdaInput
, I process it and then I return the link to the S3 with the result. Within this Lambda, I process the input, get to the rule/request run and then save itHugo Costa
05/30/2024, 10:53 AMuli
05/30/2024, 10:56 AMrunBlocking
as you need to wait for the link and the upload to be completed from non-coroutine code.uli
05/30/2024, 11:02 AMval validationResults = requests.map { request ->
async {
try {
val validationRules = getActiveValidationRules()
validateRequest(request, validationRules)
} catch (e: Exception) {
handleValidationError(request, e)
}
}
}Not sure. Maybe dismissing flows and getting back to something along the lines of your code snippet above might be more appropriate. How would you get that s3-link out of the run blocking. Could you actually model it to be an operation of the flow? Or would you have to put an effect into the state to park it in some variable to the return it?
Hugo Costa
05/30/2024, 11:05 AMHugo Costa
05/30/2024, 11:10 AMasync
would be good to then allow me to do the SQS call, something
publisher.publishResults(results.await().toList(), request, resultLink)
uli
05/30/2024, 11:10 AMfun invoke(request: ValidationLambdaInput, onCompletion: (String)->Unit): Unit = launch(Dispatchers.Default) {
/.../
onCompletion(s3Link)
}
Hugo Costa
05/30/2024, 11:12 AMuli
05/30/2024, 11:18 AMfun invoke
is never ever called from a suspend function it is ok. But “never ever” means, it must not even be called indirectly through a long call chain from a suspend function. Not today. And not tomorrow when someone else changes the code 🙂 And at least in the general case this is not easy to be sure of.Hugo Costa
05/30/2024, 11:20 AMuli
05/30/2024, 11:21 AMinvoke
is kind of your main
function. I guess then that’s what run blocking was designed for.Hugo Costa
05/30/2024, 11:22 AMmain
function, there's only a small abstract Lambda handler before to transform the JSON input with Jackson, which can run synchronouslyHugo Costa
05/30/2024, 11:31 AMuli
05/30/2024, 11:45 AMHugo Costa
05/30/2024, 12:05 PMThe problem is, that even thoughstarts a coroutine for each flow created by its transformation, the transformation itself is synchronous. It is visible in the implementation of `flatMapMerge`: the first step isflatMapMerge
, which is a synchronous operation. That is why the abovemap
would fetch students synchronously. Here is an executable example:studentsFlow
Hugo Costa
05/30/2024, 12:06 PMfun studentsFlow(studentIds: Flow<Int>): Flow<Student> = studentIds
.flatMapMerge { flow { emit(fetchStudent(it)) } }
private suspend fun fetchStudent(id: Int): Student = ...
Hugo Costa
05/30/2024, 1:45 PMHugo Costa
05/30/2024, 1:45 PMHugo Costa
05/30/2024, 2:31 PMHugo Costa
05/30/2024, 3:19 PM@OptIn(ExperimentalCoroutinesApi::class)
fun <T, Y> Flow<T>.concurrentFlatMapMerge(
maxConcurrency: Int = Int.MAX_VALUE,
transform: suspend (T) -> Y
): Flow<Y> = this
.flatMapMerge(concurrency = maxConcurrency) {
flow { emit(transform(it)) }
}
Created a unit test with a Thread.sleep(500)
and did a comparison of running it 5 times, flatMapMerge
vs concurrentFlatMapMerge
. Comes to ~3050ms vs. ~550ms. Looking forward to either see my validation service flying or crashing due to too many concurrent jobs 😄Hugo Costa
05/30/2024, 3:20 PMHugo Costa
05/31/2024, 8:16 AM