I'm having some weird behavior with flows; on this...
# coroutines
m
I'm having some weird behavior with flows; on this one flow, doing
flow.singleOrNull()
always returns null, whereas
flow.collect { println(it) }
does print the expected result. Why does this happen?
j
Could you please share a reproducible example?
s
Perhaps there is more than one element?
m
Yes, isn't single() supposed to iterate over them one at a time in that case?
🚫 1
I couldn't find any other way to collect only one element at a time
s
A terminal operator always consumes the entire flow. If you want to iterate it one element at a time, you'd need to launch a coroutine to collect it into a channel, with
produceIn
.
In the docs for
singleOrNull
you'll see that it returns null if there are no elements or if more than one element is found.
m
Hmm, is that the only solution? I tried using Sequences before this because they perfectly do what I need (lazily executing step-by-step), but they error with "restricted suspending functions can only invoke member or extension functions on their restricted coroutine scope"
s
Yes. A flow isn't stateful. If you want to iterate it statefully, you need a coroutine to hold the state.
Very often there's a way to use a flow without a stateful channel/iterator, though! Depends what you're trying to do.
m
Basically I'm working on a Lua interpreter based on kotlinx coroutines, since sequences are nearly identical to how lua's coroutines are implemented (except for sending data forward, but I have a workaround). I then ran into the above issue where sequences would not work when the SequenceScope is part of a context receiver (i.e.
context(SequenceScope<T>)
), so I switched to Flows instead which did not have this limitation, but were more confusing to use
s
Definitely an unusual use case! 😁
m
Unfortunately most of the documentation for flows seems to require using their emitted results in callbacks, which doesn't really work well for my use-case
s
Flows deliberately don't allow the type of iteration that sequences do, because a flow makes some guarantees about completion handlers (code that runs when the flow terminates) that can't be guaranteed when using a hand-cranked iterator. But more generally, flows are asynchronous generators; sequences are synchronous ones. You don't actually need
kotlinx.coroutines
to use the
sequence { ... }
generator.
Context receivers aren't released yet which is probably why they don't work with
@RestrictsSuspension
-- I would expect that to be fixed before a proper release.
Until then, if you really want to use context receivers, maybe you could write your own custom implementation? After all, the point of
suspendCoroutine
is that it's designed to be compatible with many different paradigms, not just
kotlinx.coroutines
! Could be a fun project to dig into how the sequence builder is implemented in the stdlib and copy the basic idea to make your own generators.
m
I've been trying to follow the example of SequenceBuilderIterator, but how would I initialize nextStep? It needs to be a Continuation<List<TValue*>> to be able to receive the arguments passed forward, but block.createCoroutine only returns a ContinuationUnit.
Copy code
private class LuaCoroutineScopeImpl : LuaCoroutineScope, LuaCoroutineCommunication, Continuation<Unit> {
    enum class State {
        READY, SUSPENDED, DEAD, FAILED
    }

    private var state = State.READY
    private var nextValue: LuaStatus? = null
    var nextStep: Continuation<List<TValue<*>>>? = null

    override fun send(values: List<TValue<*>>): LuaStatus {
        if (hasNext(values)) {
            return next()
        } else {
            throw IllegalStateException("No next value")
        }
    }

    private fun hasNext(values: List<TValue<*>>): Boolean {
        while (true) {
            when (state) {
                State.READY, State.SUSPENDED -> {
                    val step = nextStep ?: throw IllegalStateException("No next step")
                    nextStep = null
                    step.resume(values)
                    return true
                }
                State.DEAD -> return false
                State.FAILED -> return false
            }
        }
    }

    private fun next(): LuaStatus {
        val value = nextValue ?: throw IllegalStateException("No next value")
        nextValue = null
        return value
    }


    override fun resumeWith(result: Result<Unit>) {
        result.getOrThrow()
        if (state == State.FAILED || state == State.DEAD) {
            throw IllegalStateException("Resuming coroutine in bad state: $state")
        }
    }

    override val context: CoroutineContext
        get() = EmptyCoroutineContext
}

suspend fun createLuaScope(block: suspend LuaCoroutineScope.() -> Unit): LuaCoroutineCommunication {
    val scope = LuaCoroutineScopeImpl()
    // This doesn't work since nextStep requires a Continuation<List<TValue<*>>> and block.createCoroutine returns a Continuation<Unit>
//    scope.nextStep = block.createCoroutine(scope, scope)
    return scope
}