I want to write a function that takes a suspend la...
# coroutines
a
I want to write a function that takes a suspend lambda and returns a custom future type, like
CoroutineScope.future
or
rxSingle
functions from the integrations libraries. Those functions are all implemented with
AbstractCoroutine
, which is marked as internal. Is there a recommended way to implement that type of functionality without internal classes, or is there no way around internal APIs a the moment?
i
You, sir, are a lucky man, because I just showed an example of doing this without kotlinx.coroutines the other week:
Copy code
import kotlin.coroutines.*

class AsyncContinuation<T>: Continuation<T> {
    var result: T? = null
    override val context: CoroutineContext
        get() = EmptyCoroutineContext

    override fun resumeWith(result: Result<T>) {
        this.result = result.getOrThrow()
    }
}

class Async<T>(val ac: AsyncContinuation<T>) {
    suspend fun await(): T =
        suspendCoroutine<T> {
            it.resume(ac.result!!)
        }
}

fun <T> async(c: suspend () -> T): Async<T> {
    val ac = AsyncContinuation<T>()
    c.startCoroutine(ac)
    return Async(ac)
}

suspend fun main() {
    val async = async {
        "OK"
    }
    println(async.await())
}
Of course, this is just an example and it has some drawbacks: 1. It does not support structured concurrency; 2. It is single threaded; 3.
await
is still
suspend
.
o
if you need await to be non-suspend, there's always
runBlocking
a
Thanks for the example. I'm not trying to avoid kotlinx.coroutines, just internal APIs.
I'm trying to implement a function like
fun <T> future(coro: suspend () -> T): MyCompletableFuture<T>
which is a bit different from that example.
o
if it's specifically completable, what's wrong with using an extension function on
CoroutineScope
, calling
launch
and completing it inside the coroutine?
d
^
@Ilmir Usmanov [JB] Your code never suspends. The second your lambda suspends, your solution will fail. Because you are not notifying any waiting continuation anywhere. The code depends on the operation having completed before
await
gets called.
The easiest way to do what you want is to wrap
Deferred
value.
@Ilmir Usmanov [JB] To clarify, these two are functionally equivalent:
suspend fun await() = suspendCoroutine { it.resume(result!!) }
suspend fun await() = result!!
The continuation
it
needs to be registered somewhere and resumed when the result is known.
i
@Dico You are right, I accidentally copy-pasted wrong version. Here is full version:
Copy code
import kotlin.coroutines.*

class AsyncContinuation<T>: Continuation<T> {
    var result: T? = null
    var completion: Continuation<T>? = null
    override val context: CoroutineContext
        get() = EmptyCoroutineContext

    override fun resumeWith(result: Result<T>) {
        this.result = result.getOrThrow()
        completion?.resumeWith(result)
    }
}

class Async<T>(val ac: AsyncContinuation<T>) {
    suspend fun await(): T =
        suspendCoroutine<T> {
            val res = ac.result
            if (res != null)
                it.resume(res)
            else
                ac.completion = it
        }
}

fun <T> async(c: suspend () -> T): Async<T> {
    val ac = AsyncContinuation<T>()
    c.startCoroutine(ac)
    return Async(ac)
}

fun builder(c: suspend() -> Unit) {
    c.startCoroutine(object : Continuation<Unit>{
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<Unit>) {
            result.getOrThrow()
        }
    })
}

fun main() {
    var c: Continuation<String>? = null
    builder {
        val async = async {
            suspendCoroutine<String> { c = it }
        }
        println(async.await())
    }
    c?.resume("OK")
}
a
Kenzie, that's a great idea. Thanks everyone for the input!
i
To be completely honest, I do not like this part
Copy code
override fun resumeWith(result: Result<T>) {
        this.result = result.getOrThrow()
        completion?.resumeWith(result)
    }
It is better to throw exception only if the lambda did not suspend. I.e. rewrite it to something like
Copy code
override fun resumeWith(result: Result<T>) {
        if (completion != null) completion!!.resumeWith(result)
        else this.result = result.getOrThrow()
    }
d
It's better to store exception until
await
is called so it can be passed down no?
i
Since
async
in the example is greedy, there is no harm to throw the exception right away (especially, taking into account, that calling
await
is not necessary). If the lambda did not suspend, there is no reason to store the exception,
AsyncContinuation
is root continuation anyway. If the lambda (and, consequently,
await
) did suspend, we need to resume
await
, in order not to leak its continuation.
d
Sorry I dont understand what you mean. I'm commenting on
getOrThrow
usage. In the event that this throws (when there isn't a continuation yet),
await
will never resume. Surely you want this to be thrown in the stack frame of
await
caller. This is problematic when
await
is not called immediately.
i
I see your point.
d
Ok, sorry for going on and on commenting on the code:)
But there are many traps and things to consider to implement it correctly, which is why I like to use CompletableDeferred. What if suspend function produces null for example, you would have the same problem.