I am doing Coroutines-Rx2 interop. According to <h...
# coroutines
a
I am doing Coroutines-Rx2 interop. According to https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
Copy code
@ExperimentalCoroutinesApi
public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = GlobalScope.rxSingle(context) {
    this@asSingle.await()
}
calling
myDeferred.asSingle(coroutineContext)
will immediately call
await()
on the deferred, which effectively starts coroutine execution. Shouldn’t this only happen when Single will be subscribed on?
Imagine you have a network call behind coroutine. I wouldn’t want it to be executed every time your method is called, only when it’s been subscribed on
b
@aaverin CoroutineScope.rxSingle extension creates new single, which does nothing without subscription
a
would
mySingle.toObservable()
subscribe then?
b
nope, only if
mySingle.toObservable().subscribe()
,
mySingle.toObservable()
is just a cold observable
a
When writing this
Copy code
println("outside")
            async {
                println("inside")
            }.asSingle(CommonPool)
I get 2 prints inside and outside
there should only be outside, until I subscribe
The problem with the code you provided is that
async
runs eagerly, so the
println
is printed eagerly. That behavior comes from
async
and cannot be made lazy by
Single
.
Copy code
println("outside")
async(start = CoroutineStart.LAZY) {
  println("inside")
}.asSingle(CommonPool)
This should yield your expected result.
So only if the
Deferred
is lazy than the
Single
will also be lazy. Otherwise
Single
just `await`s the eagerly running
Deferred
.
a
Wow, this is one nasty pitfall when using both coroutines and rx2 in the same project. Thanks
s
You're very welcome :) I'd avoid
Deferred
as return types and use
suspend fun
instead. Discouples the implementation of the execution strategy.
a
What if I depend on the library that returns Deferred?
I need to somehow convert it to Rx, but if internally async is not Lazy – all execution will be made right away, without subscription
s
Create an intermediate
suspend fun
in between. You need a layer of in-direction in between.
Copy code
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async

object ExternalApi {
  fun helloWorld(): Deferred<Unit> = GlobalScope.async {
    println("Hello World!")
  }
}

suspend fun indirection(): Unit =
  ExternalApi.helloWorld().await()
The
suspend fun
should only be invoked on subscription, this will cause a new deferred to start running.
This requires that
helloWorld
returns a new
Deferred
every-time, i.e a new network request every-time and not some cached internal values.
So still not a fool proof solution tho. Luckily most frameworks like Retrofit now have support for
suspend
functions instead of
Deferred
.
a
Yes, the full example would be something like
Copy code
fun convertToProperSingle(): Single<ExternalResponse> {
  return async(start = Coroutine.LAZY) { callExternalApi() }
}.asSingle(CommonPool)

private suspend fun callExternalApi(): ExternalResponse {
  return externalApi.call().await()
}
s
Is there no way to go from
suspend fun
to
Single
without
Deferred
?
a
Let me see. I think not
s
We have an API for this in Arrow. Similar to the snippet below
Copy code
fun <A> asSingle(ctx: CoroutineContext, f: suspend () -> A): Single<A> = Single.create { emitter ->
  f.startCoroutine(Continuation(ctx) { result ->
     result.fold({ a -> emitter.onSuccess(a) }, { e -> emitter.onFailure(e) })
  })
}
This is written from my mind so I am not 100% sure about the
emitter#xxxx
function names.
Copy code
fun convertToProperSingle(): Single<ExternalResponse> =
 asSingle(CommonPool) { externalApi.call().await() }
a
public fun <T> Deferred<T>.asSingle
it’s an extension function on Deferred
s
This is your snippet updated with the conversion function I provided. There should be something similar in kotlinx tho but I’d have to check.
a
But yes, maybe will work with your snippet
thanks
s
Any time 🙂
z
CoroutineScope.rxSingle
is already lazy, just use it directly instead of going through the
asSingle
extension.
☝️ 1
Copy code
fun convertToProperSingle(): SingleExternalResponse {
  return GlobalScope.rxSingle(CommonPool) {
    callExternalApi()
  }
}

private suspend fun callExternalApi(): ExternalResponse {
  return externalApi.call().await()
}