Hi. I’m just starting learning coroutines and tried to apply them on a simple imaginary use-case inv...
j
Hi. I’m just starting learning coroutines and tried to apply them on a simple imaginary use-case involving using the Spring WebClient (which supports coroutines). I have a web server accepting GET requests, and responding the current time to each request after a delay of 2 seconds (to test parallelism). My goal is to send 10 requests to this server, but to limit the number of concurrent requests to 3 (to avoid spamming it). With reactive code, this can be achieved using
Copy code
fun run(vararg args: String?) {
        (1..10).toFlux()
            .flatMap({ this.get() }, 3)
            .collectList()
            .block()
            ?.forEach { println(it) }
    }

    private fun get(): Mono<String> {
        return webClient.get()
            .uri("/foo")
            .retrieve()
            .bodyToMono<String>()
    }
How to do that with coroutines? I managed to do using
withContext(<http://Dispatchers.IO|Dispatchers.IO>.limitedParallelism(3))
and by making each request block the current thread, but it defeats the purpose of using coroutines, doesn’t it?
s
Project Reactor and Spring WebFlux have their kotlin extensions as
awaitBody()
,
awaitSingle()
awaitExchenge()
etc. to integrate Reactor reactive flow with kotlinx.coroutines. Please read: https://spring.io/blog/2019/04/12/going-reactive-with-spring-coroutines-and-kotlin-flow https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive/kotlinx-coroutines-reactor https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive/kotlinx-coroutines-reactive
s
Hey JB, check also Flow documentation like https://kotlinlang.org/docs/flow.html#flows, Spring extensions are listed in https://docs.spring.io/spring-framework/docs/current/kdoc-api/.
j
Thank. I know about awaitBody, but that’s the point of my question: using it instead of blocking, how can I limit the concurrency (i.e. make sure only 3 requests are handled at once by the server)?
I know flows exist, even though I don’t really master them at all, but their usage seems very similar to the usage of Flux (which I know better), so if I have to use a reactive way of programming, I can do that with Flux directly.
Maybe some code would help. This does what I want, but seems silly:
Copy code
fun run() {
        runBlocking {
            withContext(Dispatchers.IO.limitedParallelism(3)) {
                (1..10).map {
                    async { get() }
                }.forEach { println(it.await()) }
            }
        }
    }

    private fun get(): String {
        val body = webClient.get()
            .uri("/foo")
            .retrieve()
            .bodyToMono<String>()
            .block()!!
        return body
    }
But replacing get() by this implementation sends all the requests at once:
Copy code
private suspend fun get(): String {
        return webClient.get()
            .uri("/foo")
            .retrieve()
            .awaitBody()
    }
s
j
Thank you all. My understanding is that, for this use-case, I’d better stick to the reactor API rather than use coroutines. I’m fine with it, but since I’m in the early stages of learning coroutines, I wonder if I’m missing something.
s
I am not a Coroutines expert either, but yeah I think this aspect of Coroutines is not mature yet, and I was surprised as well when I discovered it (coming from Reactive). My guess the Kotlin team has a lot of priorities to manage in parrallel.