https://kotlinlang.org logo
#coroutines
Title
# coroutines
a

Andrew Ebling

03/08/2021, 11:51 AM
I have a stream of events coming into a
ReceiveChanel
every 10ms, representing the state of some buttons on an external controller. When I call a method, I need to wait until I see 10 no button press events, then 10 button press events of a given type in a row, followed by 10 no button press events, then call a supplied completion handler. So far I’ve got as far as this:
Copy code
fun readButtonPress(press: ButtonPress, completion: (ButtonPress) -> (Unit)) {
  launch {
     eventReceiveChannel
        .consumeAsFlow()
        // wait for:
        // at least 10x no press events
        // 10x or more press events
        // at least 10x no press events
        .consume {
            completion(buttonPress)
        }
     }
}
My questions are therefore: 1. am I on the right track? I want to contain coroutine use to this class, as it needs to be called from a callback-based API 2. what is the correct way to track state in this scenario (i.e. number of events received in a row of a given type)? 3. what are the correct flow methods to use to achieve this goal? I think I’ve just about got my head around coroutines conceptuals and beginning to get my head around Flow, but my brain is blowing a fuse trying to figure this out.
a

araqnid

03/08/2021, 12:24 PM
The function to consume items out of a flow is
collect
, and it’s likely you basically want to create your own “operator” to collect items from upstream and update some local state:
Copy code
fun Flow<PressOrTickEvent>.watch(): Flow<Boolean> {
  return flow<Boolean> {
    var state = 0 // 0 = 10 or more no-press events, 1 = 10 or more press events, 2 = 10 or more no-press events, 3 = finished
    var eventCounter = 0
    fun reset() { state = 0; eventCounter = 0 }
    collect { event ->
      when (event) {
        is PressEvent -> {
          // advance state, eventCounter or call reset()
        }
        is TickEvent -> {
          // advance state, eventCounter etc
          // when entire sequence detected:
          if (state == 3) {
            emit(true)
          }
        }
      }
    }
  }.first()
}
Not sure what you wanted to happen if you see the “wrong” sequence of input events - just reset or somehow fail matching? This basically transforms the input flow into a flow that emits a single “true” when the sequence has been matched. Then you can do sth like:
Copy code
launch {
 eventReceiveChannel
  .consumeAsFlow()
  .watch()
  .collect {
     completion(buttonPress)
  }
Not sure which buttonPress you intend to pass to the callback - maybe you need the watch() function to emit it instead of ‘true’? Or you’re passing the ‘press’ parameter value to the callback? You need to create a CoroutineScope in your class, so you can run ‘launch’ on it — this allows you to then have a ‘cancel’ method that cancels the scope.
👍 1
a

Andrew Ebling

03/08/2021, 1:44 PM
Awesome - many thanks for your help - really appreciate it!
3 Views