janvladimirmostert
03/11/2024, 10:31 PMval state = MutableStateFlow<Int>(1)
launch(Dispatchers.Default) {
var counter = 1
while (true) {
state.emit(counter++)
delay(1000)
}
}
launch(Dispatchers.Default) {
while (state.value < 1000) {
print(state.value)
delay(250)
}
}
Currently this is printing
111122223333444...
but I want it to print
1234...
Use-case: I have a state inside a driver that runs across a socket that I'm building; before the next action is executed, it needs to wait until the state the socket writes back is IDLE, but if the state is already IDLE, it shouldn't have to wait for a state change.
Similarly with MutableSharedFlow with a buffer of 1, it does not suspend when using first() and keeps on printing 1111222... and using last() just never finishes since values are still being emitted:
val state = MutableSharedFlow<Int>(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
launch(Dispatchers.Default) {
while (state.first() < 1000) {
println(state.first())
delay(250)
}
}
The one thing I'm sure of is that it needs to be a hot flow, the coroutine that's listening for incoming messages from the socket needs to change the state constantly whether there's a listener for the state or not. Anything that's listening on the state is only interested in the current state and if it's not the right state, it should wait for the state to switch to the right state before completing.
It feels like I'm overlooking something simple here ...Adam S
03/11/2024, 11:13 PMstate
.onEach { print(it) }
.onCompletion { println(" done!") }
.launchIn(this)
Does that help?Adam S
03/11/2024, 11:13 PMjanvladimirmostert
03/11/2024, 11:17 PMlaunchIn(this)
, let me give that a try!
That's an interesting solution.
I also got it working with Mutexes, after counter++, unlock the mutex, after consuming the value, lock the mutex again.janvladimirmostert
03/11/2024, 11:18 PMvar counter = 1
val mutex = Mutex(locked = true)
launch(Dispatchers.Default) {
while (true) {
counter++
mutex.unlock()
delay(1000)
}
}
launch(Dispatchers.Default) {
while (true) {
if (counter > 100) {
break
}
println(counter)
mutex.lock()
delay(250)
}
}
janvladimirmostert
03/11/2024, 11:27 PMandylamax
03/11/2024, 11:45 PMcollect
?janvladimirmostert
03/12/2024, 2:17 PMstate.first { it == IDLE }
that line blocks until the state is IDLE, and only then does the code after it executes which is 100% what I've been looking forZach Klippenstein (he/him) [MOD]
03/12/2024, 4:39 PMZach Klippenstein (he/him) [MOD]
03/12/2024, 4:42 PMinterface Flow<T> {
suspend fun collect(emit: suspend (T) -> Unit)
}
The simplest implementation of a flow is
val myFlow = object : Flow<String> {
override suspend fun collect(emit: suspend (String) -> Unit) {
emit("hello")
// do some stuff, suspend, etc.
emit("world")
}
}
Everything else, all the flow operators, and even MutableStateFlow, are just more complicated versions of that.louiscad
03/12/2024, 9:42 PMjanvladimirmostert
03/12/2024, 10:06 PMlaunch(Dispatchers.Default) {
var counter = 1
while (true) {
state.emit(counter++)
delay(10000)
}
}
state.collect {
if (it < 100) {
println("$it")
return@collect
}
}
println("finished")
louiscad
03/12/2024, 10:11 PMtake(100)
janvladimirmostert
03/12/2024, 10:12 PMlouiscad
03/12/2024, 10:12 PMjanvladimirmostert
03/12/2024, 10:12 PMUse-case: I have a state inside a driver that runs across a socket that I'm building; before the next action is executed, it needs to wait until the state the socket writes back is IDLE, but if the state is already IDLE, it shouldn't have to wait for a state change.
louiscad
03/12/2024, 10:13 PMtransformWhile
, or another that might suit/match your specific use case betterjanvladimirmostert
03/12/2024, 10:13 PMstate.first { it == IDLE }
just wondering why everyone is suggesting to use collect, can't see how that would worklouiscad
03/12/2024, 10:13 PMlouiscad
03/12/2024, 10:14 PMjanvladimirmostert
03/12/2024, 10:15 PMlouiscad
03/12/2024, 10:15 PMfirst
is implemented, and you'll have your answer. Your IDE should let you cmd/ctrl+click into ituli
03/13/2024, 7:52 AMscope.launch {
socketState.collect { state ->
if (state == IDLE) nextCommand.execute()
}
}