jbnizet
01/13/2022, 2:39 PMfun run(vararg args: String?) {
(1..10).toFlux()
.flatMap({ this.get() }, 3)
.collectList()
.block()
?.forEach { println(it) }
}
private fun get(): Mono<String> {
return webClient.get()
.uri("/foo")
.retrieve()
.bodyToMono<String>()
}
jbnizet
01/13/2022, 2:41 PMwithContext(<http://Dispatchers.IO|Dispatchers.IO>.limitedParallelism(3))
and by making each request block the current thread, but it defeats the purpose of using coroutines, doesn’t it?Szymon Jeziorski
01/13/2022, 6:07 PMawaitBody()
, awaitSingle()
awaitExchenge()
etc. to integrate Reactor reactive flow with kotlinx.coroutines.
Please read:
https://spring.io/blog/2019/04/12/going-reactive-with-spring-coroutines-and-kotlin-flow
https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive/kotlinx-coroutines-reactor
https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive/kotlinx-coroutines-reactivesdeleuze
01/13/2022, 8:11 PMjbnizet
01/13/2022, 8:46 PMjbnizet
01/13/2022, 8:48 PMjbnizet
01/13/2022, 8:55 PMfun run() {
runBlocking {
withContext(Dispatchers.IO.limitedParallelism(3)) {
(1..10).map {
async { get() }
}.forEach { println(it.await()) }
}
}
}
private fun get(): String {
val body = webClient.get()
.uri("/foo")
.retrieve()
.bodyToMono<String>()
.block()!!
return body
}
But replacing get() by this implementation sends all the requests at once:
private suspend fun get(): String {
return webClient.get()
.uri("/foo")
.retrieve()
.awaitBody()
}
sdeleuze
01/13/2022, 8:56 PMsdeleuze
01/13/2022, 8:59 PMjbnizet
01/14/2022, 1:40 PMsdeleuze
01/14/2022, 5:03 PM