Hey, I am working on Spring Coroutines support. We...
# coroutines
s
Hey, I am working on Spring Coroutines support. WebFlux support (
WebClient
and the functional server API) is currently designed as extensions (https://github.com/spring-projects/spring-fu/tree/master/coroutines/webflux) since the API surface is big and only a small part is IO related. For methods returning
Mono
I am just using
Publisher<T>.awaitSingle()
or
Publisher<T>.awaitFirstOrNull()
, no problem there. For method where I need to create a
Mono
from a Coroutine this is less clear to me. I am currently doing
GlobalScope.mono(Dispatchers.Unconfined) { ... }
since with an extension I don't see how I should use constructs like
fun CoroutineScope.doSometing() = mono { ... }
. How I am supposed to handle that when using extensions? Also I am not really leveraging structured concurrency here, but since I am exposing a Reactive Streams core with Coroutines I am not sure yet if this is an issue or not. Support for Spring Data MongoDB (https://github.com/spring-projects/spring-fu/tree/master/coroutines/data-mongodb) and R2DBC (https://github.com/spring-projects/spring-fu/tree/master/coroutines/data-r2dbc) is based on dedicated Coroutines API since almost all the API is related to IO operation, I have used
Co
prefix like for
CoDatabaseClient
for example (https://github.com/spring-projects/spring-fu/blob/master/coroutines/data-r2dbc/src/main/kotlin/org/springframework/data/r2dbc/function/CoDatabaseClient.kt). Translating
Flux
support depends on https://github.com/Kotlin/kotlinx.coroutines/issues/254 so I have not started this part yet. I also still need to implement interop between Reactor and Coroutines context https://github.com/Kotlin/kotlinx.coroutines/issues/284 This is really the early stage, and I am not yet super familiar with Coroutines, so any feedback welcome.
👍 2
e
Since you are not utilizing structured concurrency, then using
GlobalScope.mono { ... }
is perfectly fine. See, the result is a cold stream and there’s no risk in “leaking” it. Whoever subscribes to the corresponding
Mono
is going to be responsible for disposing it
👍 2
g
Is that true for all
rxObservable/rxSingle...
from
kotlinx-coroutines-rx2
? Basically,
GlobalScope.rxObservable{}
is safe enough, assuming the subscription is being disposed later?
PS. How about hot
rxFlowable
?
e
It similar for most rx types
🙏 1