https://kotlinlang.org logo
#coroutines
Title
# coroutines
a

AJ Alt

10/30/2019, 6:24 PM
I want to write a function that takes a suspend lambda and returns a custom future type, like
CoroutineScope.future
or
rxSingle
functions from the integrations libraries. Those functions are all implemented with
AbstractCoroutine
, which is marked as internal. Is there a recommended way to implement that type of functionality without internal classes, or is there no way around internal APIs a the moment?
i

Ilmir Usmanov [JB]

10/30/2019, 6:36 PM
You, sir, are a lucky man, because I just showed an example of doing this without kotlinx.coroutines the other week:
Copy code
import kotlin.coroutines.*

class AsyncContinuation<T>: Continuation<T> {
    var result: T? = null
    override val context: CoroutineContext
        get() = EmptyCoroutineContext

    override fun resumeWith(result: Result<T>) {
        this.result = result.getOrThrow()
    }
}

class Async<T>(val ac: AsyncContinuation<T>) {
    suspend fun await(): T =
        suspendCoroutine<T> {
            it.resume(ac.result!!)
        }
}

fun <T> async(c: suspend () -> T): Async<T> {
    val ac = AsyncContinuation<T>()
    c.startCoroutine(ac)
    return Async(ac)
}

suspend fun main() {
    val async = async {
        "OK"
    }
    println(async.await())
}
Of course, this is just an example and it has some drawbacks: 1. It does not support structured concurrency; 2. It is single threaded; 3.
await
is still
suspend
.
o

octylFractal

10/30/2019, 6:40 PM
if you need await to be non-suspend, there's always
runBlocking
a

AJ Alt

10/30/2019, 6:43 PM
Thanks for the example. I'm not trying to avoid kotlinx.coroutines, just internal APIs.
I'm trying to implement a function like
fun <T> future(coro: suspend () -> T): MyCompletableFuture<T>
which is a bit different from that example.
o

octylFractal

10/30/2019, 6:56 PM
if it's specifically completable, what's wrong with using an extension function on
CoroutineScope
, calling
launch
and completing it inside the coroutine?
d

Dico

10/30/2019, 7:30 PM
^
@Ilmir Usmanov [JB] Your code never suspends. The second your lambda suspends, your solution will fail. Because you are not notifying any waiting continuation anywhere. The code depends on the operation having completed before
await
gets called.
The easiest way to do what you want is to wrap
Deferred
value.
@Ilmir Usmanov [JB] To clarify, these two are functionally equivalent:
suspend fun await() = suspendCoroutine { it.resume(result!!) }
suspend fun await() = result!!
The continuation
it
needs to be registered somewhere and resumed when the result is known.
i

Ilmir Usmanov [JB]

10/30/2019, 8:02 PM
@Dico You are right, I accidentally copy-pasted wrong version. Here is full version:
Copy code
import kotlin.coroutines.*

class AsyncContinuation<T>: Continuation<T> {
    var result: T? = null
    var completion: Continuation<T>? = null
    override val context: CoroutineContext
        get() = EmptyCoroutineContext

    override fun resumeWith(result: Result<T>) {
        this.result = result.getOrThrow()
        completion?.resumeWith(result)
    }
}

class Async<T>(val ac: AsyncContinuation<T>) {
    suspend fun await(): T =
        suspendCoroutine<T> {
            val res = ac.result
            if (res != null)
                it.resume(res)
            else
                ac.completion = it
        }
}

fun <T> async(c: suspend () -> T): Async<T> {
    val ac = AsyncContinuation<T>()
    c.startCoroutine(ac)
    return Async(ac)
}

fun builder(c: suspend() -> Unit) {
    c.startCoroutine(object : Continuation<Unit>{
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<Unit>) {
            result.getOrThrow()
        }
    })
}

fun main() {
    var c: Continuation<String>? = null
    builder {
        val async = async {
            suspendCoroutine<String> { c = it }
        }
        println(async.await())
    }
    c?.resume("OK")
}
a

AJ Alt

10/30/2019, 8:13 PM
Kenzie, that's a great idea. Thanks everyone for the input!
i

Ilmir Usmanov [JB]

10/30/2019, 8:14 PM
To be completely honest, I do not like this part
Copy code
override fun resumeWith(result: Result<T>) {
        this.result = result.getOrThrow()
        completion?.resumeWith(result)
    }
It is better to throw exception only if the lambda did not suspend. I.e. rewrite it to something like
Copy code
override fun resumeWith(result: Result<T>) {
        if (completion != null) completion!!.resumeWith(result)
        else this.result = result.getOrThrow()
    }
d

Dico

10/31/2019, 9:01 AM
It's better to store exception until
await
is called so it can be passed down no?
i

Ilmir Usmanov [JB]

10/31/2019, 9:53 AM
Since
async
in the example is greedy, there is no harm to throw the exception right away (especially, taking into account, that calling
await
is not necessary). If the lambda did not suspend, there is no reason to store the exception,
AsyncContinuation
is root continuation anyway. If the lambda (and, consequently,
await
) did suspend, we need to resume
await
, in order not to leak its continuation.
d

Dico

11/01/2019, 2:02 PM
Sorry I dont understand what you mean. I'm commenting on
getOrThrow
usage. In the event that this throws (when there isn't a continuation yet),
await
will never resume. Surely you want this to be thrown in the stack frame of
await
caller. This is problematic when
await
is not called immediately.
i

Ilmir Usmanov [JB]

11/01/2019, 2:04 PM
I see your point.
d

Dico

11/01/2019, 2:07 PM
Ok, sorry for going on and on commenting on the code:)
But there are many traps and things to consider to implement it correctly, which is why I like to use CompletableDeferred. What if suspend function produces null for example, you would have the same problem.
3 Views