Hugo Costa
05/30/2024, 9:06 AMrunBlocking(Dispatchers.Default) {
(...)
activeKotlinRules.asFlow()
.mapNotNull {
// Get an instance
ruleRepository.getRule(RuleTag.valueOf(it.ruleName))
}.flatMapMerge { rule ->
requests.asFlow()
.flatMapMerge() { request ->
rule.validate(request, network)
}
}
}
All of the rules validate function is suspended and mostly everything within it as well. Nonetheless, looking at the logs, it seems like we're getting stuck waiting for queries (using the Kotlin SDK btw) to happen. Am I not fully utilising the multithreaded/suspend capabilities of my code? I would expect that as soon as it hits the SDK code, it would let it run on the IO dispatcher and start another rule/request pair.
For reference, the SDK code is all something like (which I'm told from the Kotlin SDK team is not even that necessary, because internally, they automatically make all their ops go to the IO dispatcher
suspend fun executeQuery(query: String): String = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
val statementId = executeQuery(query)
waitForStatement(statementId)
return redshiftClient.getStatementResult(
GetStatementResultRequest { id = statementId },
)
}Sam
05/30/2024, 9:08 AMwould expect that as soon as it hits the SDK code, it would let it run on the IO dispatcher and start another rule/request pair.This might be a misunderstanding. Suspending functions are sequential by default. Whether you switch dispatchers or not, a suspending function will always suspend to wait for all its work to complete. If you wanted concurrency, you'd need to introduce it explicitly by launching additional coroutines.
Sam
05/30/2024, 9:09 AMSam
05/30/2024, 9:10 AMexecuteQuery function relate to the earlier code? is it called inside the ruleRepository?Hugo Costa
05/30/2024, 9:44 AMSuspending functions are sequential by defaultSo even though it can be suspended, it doesn't necessarily mean it will? i.e. if I have a simple for loop with 2 items, it won't necessarily start work on the 2nd item until the first one is done?
Hugo Costa
05/30/2024, 9:45 AMexecuteQuery is a function that is triggered by some of the rules, within the rule.validate(request, network) callSam
05/30/2024, 9:49 AMif I have a simple for loop with 2 items, it won't necessarily start work on the 2nd item until the first one is done?Correct; it will always wait for the first one before continuing to the second one.
even though it can be suspended, it doesn't necessarily mean it will?Actually, "suspended" means "paused". If you call a suspending function inside a loop, then when it suspends, it just pauses the whole loop. That's why the loop does not continue until the suspending function is finished.
Hugo Costa
05/30/2024, 9:50 AMHugo Costa
05/30/2024, 9:51 AMsupervisorScope {
requests.forEach { request ->
launch {
try {
val validationRules = getActiveValidationRules()
validateRequest(request, validationRules)
} catch (e: Exception) {
// Handle the exception for the current request
// without affecting other requests
handleValidationError(request, e)
}
}
}
}
In this case, as I understand it, each rule/request would be its own job and would run/pause as necessary, right?Sam
05/30/2024, 9:52 AMlaunch or async.Sam
05/30/2024, 9:53 AMlaunch returns immediately, letting the forEach loop continue right away. Then, once the loop is done, the supervisorScope is actually responsible for waiting for all the coroutines to finish.Hugo Costa
05/30/2024, 9:54 AMflatMapMerge then is a bit confusing to me - it allows for concurrency but will still do things sequentially, without any possibility of running it in parallel?Sam
05/30/2024, 9:55 AMuli
05/30/2024, 9:55 AMasync instead of launch and after all `asyncs`are created, you can awaitAllHugo Costa
05/30/2024, 9:57 AM{
"timestamp": "2024-05-30T09:43:54.420Z",
"lineNumber": 287,
"class": "Rule#1",
"loggerName": "...",
"message": "...",
"level": "INFO",
"thread": "DefaultDispatcher-worker-4"
}
{
"timestamp": "2024-05-30T09:44:00.761Z",
"lineNumber": 153,
"class": "Rule#2",
"loggerName": "...",
"message": "...",
"level": "INFO",
"thread": "DefaultDispatcher-worker-8"
}
Looking at the logic, it appears there are specific workers for each rule, but they're all sequential as discussedHugo Costa
05/30/2024, 9:59 AMval validationResults = requests.map { request ->
async {
try {
val validationRules = getActiveValidationRules()
validateRequest(request, validationRules)
} catch (e: Exception) {
handleValidationError(request, e)
}
}
}Hugo Costa
05/30/2024, 10:01 AMchunked api that was just launched 🙂 )uli
05/30/2024, 10:09 AMfun CoroutineScope.validationResultFlow(
activeKotlinRules: List<Any>,
requests: List<Any>,
): Flow<Any> = callbackFlow {
activeKotlinRules.forEach {
// Get an instance
launch(Dispatchers.Default) {
val rule = ruleRepository.getRule(RuleTag.valueOf(it.ruleName))
requests.forEach { request ->
launch {
send(rule.validate(request, network))
} }
}
}
}Sam
05/30/2024, 10:26 AMcallbackFlow there 👍. There might be flow operators that would work, but this way seems much easier to understand. One note: there's no need for the CoroutineScope receiver here, since the callbackFlow has its own scope.Sam
05/30/2024, 10:31 AMchannelFlow might be more appropriate than a callbackFlow. They're basically the same, but a channelFlow is designed to wait for its child coroutines whereas a callbackFlow would end the flow at the end of the loop (which is not what's wanted here)Hugo Costa
05/30/2024, 10:33 AMlaunch(Dispatchers.Default) for both right? Or since we're launching within a launch, it doesn't really matter?Sam
05/30/2024, 10:34 AMHugo Costa
05/30/2024, 10:35 AMfun invoke(request: ValidationLambdaInput): String = runBlocking(Dispatchers.Default) {
/.../
}
So in practice, I should be able to emit both right?Sam
05/30/2024, 10:38 AMvalidationResultFlow example, if the flow is collected from a coroutine that's already using Dispatchers.Default, then there's no need to specify any further dispatcher inside the function. It won't hurt either, though.Sam
05/30/2024, 10:39 AMHugo Costa
05/30/2024, 10:40 AMSam
05/30/2024, 10:42 AMGlobalScope.launch will use Dispatchers.Default. And so will CoroutineScope(EmptyCoroutineContext) and suspend fun main() = coroutineScope { ... }. What you're seeing is probably down to the fact that runBlocking actually creates a scope that already comes with a non-default dispatcher. I agree, it can be a bit unexpected.uli
05/30/2024, 10:45 AMOften people will specify a dispatcher just for safety, if they’re not sure where their code will be called from. That’s called “main-safety”, since it ensures the code is safe to call from the main thread in UI apps.And you should. The rule is no coroutine will block a callers thread. So if something needs to run on default/io specifiy it. If you are already on io/default no dispatching should happen.
uli
05/30/2024, 10:50 AMfun invoke(request: ValidationLambdaInput): String = runBlocking(Dispatchers.Default) {
> /.../
> }
You mean you are collecting the flow within a runBlocking(Dispatchers.Default)? Or you are calling something like my fun validationResultFlow from the runBlocking?
None of those should require a runBlocking. Setting up the flow is String that fun invoke returns?
If you want to wait for all validation results, then maybe a flow is the wrong tool and we should go back to asyncHugo Costa
05/30/2024, 10:53 AMValidationLambdaInput , I process it and then I return the link to the S3 with the result. Within this Lambda, I process the input, get to the rule/request run and then save itHugo Costa
05/30/2024, 10:53 AMuli
05/30/2024, 10:56 AMrunBlocking as you need to wait for the link and the upload to be completed from non-coroutine code.uli
05/30/2024, 11:02 AMval validationResults = requests.map { request ->
async {
try {
val validationRules = getActiveValidationRules()
validateRequest(request, validationRules)
} catch (e: Exception) {
handleValidationError(request, e)
}
}
}Not sure. Maybe dismissing flows and getting back to something along the lines of your code snippet above might be more appropriate. How would you get that s3-link out of the run blocking. Could you actually model it to be an operation of the flow? Or would you have to put an effect into the state to park it in some variable to the return it?
Hugo Costa
05/30/2024, 11:05 AMHugo Costa
05/30/2024, 11:10 AMasync would be good to then allow me to do the SQS call, something
publisher.publishResults(results.await().toList(), request, resultLink)uli
05/30/2024, 11:10 AMfun invoke(request: ValidationLambdaInput, onCompletion: (String)->Unit): Unit = launch(Dispatchers.Default) {
/.../
onCompletion(s3Link)
}Hugo Costa
05/30/2024, 11:12 AMuli
05/30/2024, 11:18 AMfun invoke is never ever called from a suspend function it is ok. But “never ever” means, it must not even be called indirectly through a long call chain from a suspend function. Not today. And not tomorrow when someone else changes the code 🙂 And at least in the general case this is not easy to be sure of.Hugo Costa
05/30/2024, 11:20 AMuli
05/30/2024, 11:21 AMinvoke is kind of your main function. I guess then that’s what run blocking was designed for.Hugo Costa
05/30/2024, 11:22 AMmain function, there's only a small abstract Lambda handler before to transform the JSON input with Jackson, which can run synchronouslyHugo Costa
05/30/2024, 11:31 AMuli
05/30/2024, 11:45 AMHugo Costa
05/30/2024, 12:05 PMThe problem is, that even thoughstarts a coroutine for each flow created by its transformation, the transformation itself is synchronous. It is visible in the implementation of `flatMapMerge`: the first step isflatMapMerge, which is a synchronous operation. That is why the abovemapwould fetch students synchronously. Here is an executable example:studentsFlow
Hugo Costa
05/30/2024, 12:06 PMfun studentsFlow(studentIds: Flow<Int>): Flow<Student> = studentIds
.flatMapMerge { flow { emit(fetchStudent(it)) } }
private suspend fun fetchStudent(id: Int): Student = ...Hugo Costa
05/30/2024, 1:45 PMHugo Costa
05/30/2024, 1:45 PMHugo Costa
05/30/2024, 2:31 PMHugo Costa
05/30/2024, 3:19 PM@OptIn(ExperimentalCoroutinesApi::class)
fun <T, Y> Flow<T>.concurrentFlatMapMerge(
maxConcurrency: Int = Int.MAX_VALUE,
transform: suspend (T) -> Y
): Flow<Y> = this
.flatMapMerge(concurrency = maxConcurrency) {
flow { emit(transform(it)) }
}
Created a unit test with a Thread.sleep(500) and did a comparison of running it 5 times, flatMapMerge vs concurrentFlatMapMerge . Comes to ~3050ms vs. ~550ms. Looking forward to either see my validation service flying or crashing due to too many concurrent jobs 😄Hugo Costa
05/30/2024, 3:20 PMHugo Costa
05/31/2024, 8:16 AM