https://kotlinlang.org logo
#coroutines
Title
# coroutines
a

aaverin

06/06/2019, 11:07 AM
I am doing Coroutines-Rx2 interop. According to https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
Copy code
@ExperimentalCoroutinesApi
public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = GlobalScope.rxSingle(context) {
    this@asSingle.await()
}
calling
myDeferred.asSingle(coroutineContext)
will immediately call
await()
on the deferred, which effectively starts coroutine execution. Shouldn’t this only happen when Single will be subscribed on?
Imagine you have a network call behind coroutine. I wouldn’t want it to be executed every time your method is called, only when it’s been subscribed on
b

bezrukov

06/06/2019, 11:17 AM
@aaverin CoroutineScope.rxSingle extension creates new single, which does nothing without subscription
a

aaverin

06/06/2019, 11:19 AM
would
mySingle.toObservable()
subscribe then?
b

bezrukov

06/06/2019, 11:20 AM
nope, only if
mySingle.toObservable().subscribe()
,
mySingle.toObservable()
is just a cold observable
a

aaverin

06/06/2019, 11:24 AM
When writing this
Copy code
println("outside")
            async {
                println("inside")
            }.asSingle(CommonPool)
I get 2 prints inside and outside
there should only be outside, until I subscribe
The problem with the code you provided is that
async
runs eagerly, so the
println
is printed eagerly. That behavior comes from
async
and cannot be made lazy by
Single
.
Copy code
println("outside")
async(start = CoroutineStart.LAZY) {
  println("inside")
}.asSingle(CommonPool)
This should yield your expected result.
So only if the
Deferred
is lazy than the
Single
will also be lazy. Otherwise
Single
just `await`s the eagerly running
Deferred
.
a

aaverin

06/06/2019, 12:34 PM
Wow, this is one nasty pitfall when using both coroutines and rx2 in the same project. Thanks
s

simon.vergauwen

06/06/2019, 12:36 PM
You're very welcome :) I'd avoid
Deferred
as return types and use
suspend fun
instead. Discouples the implementation of the execution strategy.
a

aaverin

06/06/2019, 12:37 PM
What if I depend on the library that returns Deferred?
I need to somehow convert it to Rx, but if internally async is not Lazy – all execution will be made right away, without subscription
s

simon.vergauwen

06/06/2019, 12:39 PM
Create an intermediate
suspend fun
in between. You need a layer of in-direction in between.
Copy code
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async

object ExternalApi {
  fun helloWorld(): Deferred<Unit> = GlobalScope.async {
    println("Hello World!")
  }
}

suspend fun indirection(): Unit =
  ExternalApi.helloWorld().await()
The
suspend fun
should only be invoked on subscription, this will cause a new deferred to start running.
This requires that
helloWorld
returns a new
Deferred
every-time, i.e a new network request every-time and not some cached internal values.
So still not a fool proof solution tho. Luckily most frameworks like Retrofit now have support for
suspend
functions instead of
Deferred
.
a

aaverin

06/06/2019, 12:45 PM
Yes, the full example would be something like
Copy code
fun convertToProperSingle(): Single<ExternalResponse> {
  return async(start = Coroutine.LAZY) { callExternalApi() }
}.asSingle(CommonPool)

private suspend fun callExternalApi(): ExternalResponse {
  return externalApi.call().await()
}
s

simon.vergauwen

06/06/2019, 12:46 PM
Is there no way to go from
suspend fun
to
Single
without
Deferred
?
a

aaverin

06/06/2019, 12:46 PM
Let me see. I think not
s

simon.vergauwen

06/06/2019, 12:48 PM
We have an API for this in Arrow. Similar to the snippet below
Copy code
fun <A> asSingle(ctx: CoroutineContext, f: suspend () -> A): Single<A> = Single.create { emitter ->
  f.startCoroutine(Continuation(ctx) { result ->
     result.fold({ a -> emitter.onSuccess(a) }, { e -> emitter.onFailure(e) })
  })
}
This is written from my mind so I am not 100% sure about the
emitter#xxxx
function names.
Copy code
fun convertToProperSingle(): Single<ExternalResponse> =
 asSingle(CommonPool) { externalApi.call().await() }
a

aaverin

06/06/2019, 12:49 PM
public fun <T> Deferred<T>.asSingle
it’s an extension function on Deferred
s

simon.vergauwen

06/06/2019, 12:50 PM
This is your snippet updated with the conversion function I provided. There should be something similar in kotlinx tho but I’d have to check.
a

aaverin

06/06/2019, 12:50 PM
But yes, maybe will work with your snippet
thanks
s

simon.vergauwen

06/06/2019, 12:50 PM
Any time 🙂
z

Zach Klippenstein (he/him) [MOD]

06/06/2019, 3:14 PM
CoroutineScope.rxSingle
is already lazy, just use it directly instead of going through the
asSingle
extension.
☝️ 1
Copy code
fun convertToProperSingle(): SingleExternalResponse {
  return GlobalScope.rxSingle(CommonPool) {
    callExternalApi()
  }
}

private suspend fun callExternalApi(): ExternalResponse {
  return externalApi.call().await()
}
7 Views