Is there a way to suspend while waiting for the ne...
# coroutines
j
Is there a way to suspend while waiting for the next value of a MutableStateFlow?
Copy code
val state = MutableStateFlow<Int>(1)

        launch(Dispatchers.Default) {
            var counter = 1
            while (true) {
                state.emit(counter++)
                delay(1000)
            }
        }

        launch(Dispatchers.Default) {
            while (state.value < 1000) {
                print(state.value)
                delay(250)
            }
        }
Currently this is printing
Copy code
111122223333444...
but I want it to print
Copy code
1234...
Use-case: I have a state inside a driver that runs across a socket that I'm building; before the next action is executed, it needs to wait until the state the socket writes back is IDLE, but if the state is already IDLE, it shouldn't have to wait for a state change. Similarly with MutableSharedFlow with a buffer of 1, it does not suspend when using first() and keeps on printing 1111222... and using last() just never finishes since values are still being emitted:
Copy code
val state = MutableSharedFlow<Int>(
            replay = 1,
            onBufferOverflow = BufferOverflow.DROP_OLDEST
        )

        launch(Dispatchers.Default) {
            while (state.first() < 1000) {
                println(state.first())
                delay(250)
            }
        }
The one thing I'm sure of is that it needs to be a hot flow, the coroutine that's listening for incoming messages from the socket needs to change the state constantly whether there's a listener for the state or not. Anything that's listening on the state is only interested in the current state and if it's not the right state, it should wait for the state to switch to the right state before completing. It feels like I'm overlooking something simple here ...
a
You can subscribe to a StateFlow (or SharedFlow) by applying some intermediate operators and then launching it:
Copy code
state
    .onEach { print(it) }
    .onCompletion { println(" done!") }
    .launchIn(this)
Does that help?
main.kt
j
ah, so the magic line here is
launchIn(this)
, let me give that a try! That's an interesting solution. I also got it working with Mutexes, after counter++, unlock the mutex, after consuming the value, lock the mutex again.
Copy code
var counter = 1
val mutex = Mutex(locked = true)


launch(Dispatchers.Default) {
    while (true) {
        counter++
        mutex.unlock()
        delay(1000)
    }
}

launch(Dispatchers.Default) {
    while (true) {
        if (counter > 100) {
            break
        }
        println(counter)
        mutex.lock()
        delay(250)
    }
}
that state.onCompletion will never be reached, it will forever be receiving data. How would that mechanism wait until the counter is 100 for example before println("launched...") ?
a
if you already have an instance why not just call
collect
?
5
j
Somebody suggested that on StackOverflow as well, but doesn't state.collect require the MutableStateFlow to end before it collects? The flow will never end, it will remain for as long as the application is running. What did work was doing
Copy code
state.first { it == IDLE }
that line blocks until the state is IDLE, and only then does the code after it executes which is 100% what I've been looking for
z
No, the main use case of flows are to collect them, and your collector gets called when the upstream emits. Google “reactive streams”, which flows are an implementation of
A flow is just this:
Copy code
interface Flow<T> {
  suspend fun collect(emit: suspend (T) -> Unit)
}
The simplest implementation of a flow is
Copy code
val myFlow = object : Flow<String> {
  override suspend fun collect(emit: suspend (String) -> Unit) {
    emit("hello")
    // do some stuff, suspend, etc.
    emit("world")
  }
}
Everything else, all the flow operators, and even MutableStateFlow, are just more complicated versions of that.
l
collect is what you're looking for. A StateFlow is still a Flow.
j
maybe give an example? how would I stop it from collecting when the value reaches > 100 to println the "finished" section?
Copy code
launch(Dispatchers.Default) {
            var counter = 1
            while (true) {
                state.emit(counter++)
                delay(10000)
            }
        }

        state.collect {
            if (it < 100) {
                println("$it")
                return@collect
            }
        }

        println("finished")
l
Just add an operator before:
take(100)
j
for the use-case I'm experimenting with, I don't know how many values I'll be getting before I hit the IDLE state
l
j
Copy code
Use-case: I have a state inside a driver that runs across a socket that I'm building; before the next action is executed, it needs to wait until the state the socket writes back is IDLE, but if the state is already IDLE, it shouldn't have to wait for a state change.
l
You can use
transformWhile
, or another that might suit/match your specific use case better
j
this works
Copy code
state.first { it == IDLE }
just wondering why everyone is suggesting to use collect, can't see how that would work
l
If you just want to wait for a particular value, then, yes, just use that.
Anyway, read the doc I linked (functions tab), it's really helpful to know all the built-in operators, most of them can help you now, or one day
j
for academic reasons, how would you implement the same thing with collect, or would you have to implement the guts of what first is doing to get the same result?
l
Look at how
first
is implemented, and you'll have your answer. Your IDE should let you cmd/ctrl+click into it
u
> just wondering why everyone is suggesting to use collect, can’t see how that would work Using collect would be the reactive stream approach. instead of ‘actively’ waiting, you let state trigger your actions:
Copy code
scope.launch {
  socketState.collect { state ->
    if (state == IDLE) nextCommand.execute()
  }
}
👆 1