Hello Coroutines friends, I have a simple validati...
# coroutines
h
Hello Coroutines friends, I have a simple validation service (jvm) that takes as input a list of Validation Rules and a list of requests. Every request needs to be validated against each rule. Currently, I have the following setup (simplified, I have some metrics and try/catch)
Copy code
runBlocking(Dispatchers.Default) {
(...)
     activeKotlinRules.asFlow()
                .mapNotNull {
                        // Get an instance
                        ruleRepository.getRule(RuleTag.valueOf(it.ruleName))
                }.flatMapMerge { rule ->
                    requests.asFlow()
                        .flatMapMerge() { request ->
                                rule.validate(request, network)
                        }
                }
}
All of the rules
validate
function is suspended and mostly everything within it as well. Nonetheless, looking at the logs, it seems like we're getting stuck waiting for queries (using the Kotlin SDK btw) to happen. Am I not fully utilising the multithreaded/suspend capabilities of my code? I would expect that as soon as it hits the SDK code, it would let it run on the IO dispatcher and start another rule/request pair. For reference, the SDK code is all something like (which I'm told from the Kotlin SDK team is not even that necessary, because internally, they automatically make all their ops go to the IO dispatcher
Copy code
suspend fun executeQuery(query: String): String = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
        val statementId = executeQuery(query)
        waitForStatement(statementId)

        return redshiftClient.getStatementResult(
            GetStatementResultRequest { id = statementId },
        )
    }
s
would expect that as soon as it hits the SDK code, it would let it run on the IO dispatcher and start another rule/request pair.
This might be a misunderstanding. Suspending functions are sequential by default. Whether you switch dispatchers or not, a suspending function will always suspend to wait for all its work to complete. If you wanted concurrency, you'd need to introduce it explicitly by launching additional coroutines.
I haven't fully understood the flow code here, though, so I might be missing the point
How does the
executeQuery
function relate to the earlier code? is it called inside the
ruleRepository
?
h
Suspending functions are sequential by default
So even though it can be suspended, it doesn't necessarily mean it will? i.e. if I have a simple for loop with 2 items, it won't necessarily start work on the 2nd item until the first one is done?
executeQuery
is a function that is triggered by some of the rules, within the
rule.validate(request, network)
call
👍 1
s
if I have a simple for loop with 2 items, it won't necessarily start work on the 2nd item until the first one is done?
Correct; it will always wait for the first one before continuing to the second one.
even though it can be suspended, it doesn't necessarily mean it will?
Actually, "suspended" means "paused". If you call a suspending function inside a loop, then when it suspends, it just pauses the whole loop. That's why the loop does not continue until the suspending function is finished.
h
Ah-ha - that clears it up then. I need to find a way where the loop is "ok" with leaving stuff "undone" and continue while it waits for an IO op to finish, right?
I was chatting with ChatGPT about it and without giving it any code, it mentioned I could do something like
Copy code
supervisorScope {
            requests.forEach { request ->
                launch {
                    try {
                        val validationRules = getActiveValidationRules()
                        validateRequest(request, validationRules)
                    } catch (e: Exception) {
                        // Handle the exception for the current request
                        // without affecting other requests
                        handleValidationError(request, e)
                    }
                }
            }
        }
In this case, as I understand it, each rule/request would be its own job and would run/pause as necessary, right?
s
That makes sense 👍. When you want to start something without waiting for it, you typically use coroutine builder like
launch
or
async
.
Each call to
launch
returns immediately, letting the
forEach
loop continue right away. Then, once the loop is done, the
supervisorScope
is actually responsible for waiting for all the coroutines to finish.
h
Ok, exactly what I was looking for. The
flatMapMerge
then is a bit confusing to me - it allows for concurrency but will still do things sequentially, without any possibility of running it in parallel?
s
To be honest, that's the part I didn't exactly follow in the original code. I am not sure whether the flow runs concurrently or not.
u
As I guess your validation requests need to return a result, you could use
async
instead of
launch
and after all `asyncs`are created, you can
awaitAll
h
Copy code
{
    "timestamp": "2024-05-30T09:43:54.420Z",
    "lineNumber": 287,
    "class": "Rule#1",
    "loggerName": "...",
    "message": "...",
    "level": "INFO",
    "thread": "DefaultDispatcher-worker-4"
}

{
    "timestamp": "2024-05-30T09:44:00.761Z",
    "lineNumber": 153,
    "class": "Rule#2",
    "loggerName": "...",
    "message": "...",
    "level": "INFO",
    "thread": "DefaultDispatcher-worker-8"
}
Looking at the logic, it appears there are specific workers for each rule, but they're all sequential as discussed
Uli, would it be something like (AI code)
Copy code
val validationResults = requests.map { request ->
            async {
                try {
                    val validationRules = getActiveValidationRules()
                    validateRequest(request, validationRules)
                } catch (e: Exception) {
                    handleValidationError(request, e)
                }
            }
        }
The result is a Flow<ValidationResult>, as we're currently exploring emitting results as soon as they come in (with the new
chunked
api that was just launched 🙂 )
u
If you want a flow you might want to try something like this:
Copy code
fun CoroutineScope.validationResultFlow(
    activeKotlinRules: List<Any>,
    requests: List<Any>,
): Flow<Any> = callbackFlow {
    activeKotlinRules.forEach {
        // Get an instance
        launch(Dispatchers.Default) {
            val rule = ruleRepository.getRule(RuleTag.valueOf(it.ruleName))
            requests.forEach { request ->
                launch {
                    send(rule.validate(request, network))
                }            }
        }
    }
}
s
I like the idea of using a
callbackFlow
there 👍. There might be flow operators that would work, but this way seems much easier to understand. One note: there's no need for the
CoroutineScope
receiver here, since the
callbackFlow
has its own scope.
👍 1
Actually, I think a
channelFlow
might be more appropriate than a
callbackFlow
. They're basically the same, but a
channelFlow
is designed to wait for its child coroutines whereas a
callbackFlow
would end the flow at the end of the loop (which is not what's wanted here)
👍 1
h
I assume
launch(Dispatchers.Default)
for both right? Or since we're launching within a launch, it doesn't really matter?
s
Correct, the dispatcher would be inherited from the outer coroutine, so you wouldn't need to specify it again
h
I am currently running this in a Lambda, whose handleRequest function already specifies it
Copy code
fun invoke(request: ValidationLambdaInput): String = runBlocking(Dispatchers.Default) {
/.../
}
So in practice, I should be able to emit both right?
s
Yes. In the
validationResultFlow
example, if the flow is collected from a coroutine that's already using
Dispatchers.Default
, then there's no need to specify any further dispatcher inside the function. It won't hurt either, though.
Often people will specify a dispatcher just for safety, if they're not sure where their code will be called from. That's called "main-safety", since it ensures the code is safe to call from the main thread in UI apps.
👍 1
h
Yup, certainly makes sense. I'm surprised as to why Dispatchers.Default isn't the default scope though - I haven't encountered a use case where I wouldn't want it (other than the IO)
s
Well, it is the default when launching a coroutine from a scope that has no other dispatcher. So
GlobalScope.launch
will use
Dispatchers.Default
. And so will
CoroutineScope(EmptyCoroutineContext)
and
suspend fun main() = coroutineScope { ... }
. What you're seeing is probably down to the fact that
runBlocking
actually creates a scope that already comes with a non-default dispatcher. I agree, it can be a bit unexpected.
u
Often people will specify a dispatcher just for safety, if they’re not sure where their code will be called from. That’s called “main-safety”, since it ensures the code is safe to call from the main thread in UI apps.
And you should. The rule is no coroutine will block a callers thread. So if something needs to run on default/io specifiy it. If you are already on io/default no dispatching should happen.
> I am currently running this in a Lambda, whose handleRequest function already specifies it >
Copy code
fun invoke(request: ValidationLambdaInput): String = runBlocking(Dispatchers.Default) {
> /.../
> }
You mean you are collecting the flow within a
runBlocking(Dispatchers.Default)
? Or you are calling something like my
fun validationResultFlow
from the runBlocking? None of those should require a runBlocking. Setting up the flow is fast not suspending. And collecting the results does not need to be waited for (I assume) so you could put another launch around it instead of an evil runBlocking. What is the
String
that
fun invoke
returns? If you want to wait for all validation results, then maybe a flow is the wrong tool and we should go back to
async
h
So the Lambda receives
ValidationLambdaInput
, I process it and then I return the link to the S3 with the result. Within this Lambda, I process the input, get to the rule/request run and then save it
Should I be using a different scope then?
u
I see. So collecting and in the collector uploading to s3 and getting the link all run inside
runBlocking
as you need to wait for the link and the upload to be completed from non-coroutine code.
👌 1
val validationResults = requests.map { request ->
async {
try {
val validationRules = getActiveValidationRules()
validateRequest(request, validationRules)
} catch (e: Exception) {
handleValidationError(request, e)
}
}
}
Not sure. Maybe dismissing flows and getting back to something along the lines of your code snippet above might be more appropriate. How would you get that s3-link out of the run blocking. Could you actually model it to be an operation of the flow? Or would you have to put an effect into the state to park it in some variable to the return it?
h
Currently, our integration doesn't really rely on the s3 link, it's just the end operation we agreed upon. Our customer gets the result via SQS, where we're currently working with them to get to a point where we chunk e.g. 100 results and send them. Currently though, the idea is to run everything in parallel, gather all the results and then publish once in SQS
using
async
would be good to then allow me to do the SQS call, something
Copy code
publisher.publishResults(results.await().toList(), request, resultLink)
u
Ok. But any way, you have to wait for the operation to complete. Or could you send to the SQS queue from the collect operator. And start the whole job as a kind of ‘fire and forget’ instead of waiting for it? In summary, I would at least think about getting rid of runBlocking. Coroutines also map well with the classical java callback approach.
Copy code
fun invoke(request: ValidationLambdaInput, onCompletion: (String)->Unit): Unit = launch(Dispatchers.Default) {
/.../
   onCompletion(s3Link)
}
h
Sounds good. Most of the documentation is always with runBlocking, and it just works™, that's the main reason I tend to stick with it. Let me try to replace it with launch 🙂
u
If you are absolutely sure, that
fun invoke
is never ever called from a suspend function it is ok. But “never ever” means, it must not even be called indirectly through a long call chain from a suspend function. Not today. And not tomorrow when someone else changes the code 🙂 And at least in the general case this is not easy to be sure of.
h
Yes, that I'm sure, this Lambda and it's code base are not be reused anywhere (other than through the actual service)
u
so
invoke
is kind of your
main
function. I guess then that’s what run blocking was designed for.
h
Yes, exactly, definitively my
main
function, there's only a small abstract Lambda handler before to transform the JSON input with Jackson, which can run synchronously
Trying to apply this to my code, I'm starting to see that indeed using map + async might be better, as launch seems more towards fire & forget type of operations (which wouldn't work for me, as I want to be able to create a flow for downstream use)
👍 1
u
Also take a look at <https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/await-all.html|Collection<DeferredT>.awaitAll>, as it optimizes the error case by failing immediately as soon as one of the deferreds fail.
h
Found an article on flatMapMerge that you might find interesting as well - https://kt.academy/article/flat_map_merge
The problem is, that even though
flatMapMerge
starts a coroutine for each flow created by its transformation, the transformation itself is synchronous. It is visible in the implementation of `flatMapMerge`: the first step is
map
, which is a synchronous operation. That is why the above
studentsFlow
would fetch students synchronously. Here is an executable example:
👍 1
Their suggestion is that well, it works, but we need to adapt it to run asynchronously
Copy code
fun studentsFlow(studentIds: Flow<Int>): Flow<Student> = studentIds
    .flatMapMerge { flow { emit(fetchStudent(it)) } }

private suspend fun fetchStudent(id: Int): Student = ...
I think I will adopt this "concurrentFlatMapMerge" where every single item becomes its own flow
👍 1
shake probably needs a few less flows
🤣 1
very nice 1
Ok, for future users that might come to a similar issue - I'm going to experiment with
Copy code
@OptIn(ExperimentalCoroutinesApi::class)
fun <T, Y> Flow<T>.concurrentFlatMapMerge(
    maxConcurrency: Int = Int.MAX_VALUE,
    transform: suspend (T) -> Y
): Flow<Y> = this
    .flatMapMerge(concurrency = maxConcurrency) {
        flow { emit(transform(it)) }
    }
Created a unit test with a
Thread.sleep(500)
and did a comparison of running it 5 times,
flatMapMerge
vs
concurrentFlatMapMerge
. Comes to ~3050ms vs. ~550ms. Looking forward to either see my validation service flying or crashing due to too many concurrent jobs 😄
❤️ 1
Thank you both for the help, I'll let you know how it goes!
Morning - happy to report it worked amazingly, achieved a 70%+ reduction in runtime. Now I'm even reaching the full Lambda memory, so time to optimise that as well. Long live coroutines!