Is there something I can do for the use case of: I...
# flow
s
Is there something I can do for the use case of: I am observing a flow. I want to start an infinite loop when I receive a specific type (sealed, limited options) the flow gives me. When I get the same type again, I do not want to re-start this infinite loop but let it keep going. When I get a different type I want to stop that old infinite loop and do nothing until I again get the type I’m looking for. Code:
Copy code
sealed interface Types {
  object One : Types
  object Two : Types
}
val someFlow: Flow<Types>

someFlow
  .filterIsInstance<Types.One>()
  .distinctUntilChanged()
  .collectLatest() { one ->
    // I want this to be cancelled on an emission of `Two` from `someFlow` but keep on going alive without interruption on a repeat emission of `One` if it was `One` before too
    coroutineScope {
      while(isActive) {
        doWork(one)
      }
    }
  }
Hmm, I am now doing
Copy code
.map { it as? Types.One }
.distinctUntilChanged()
.collectLatest { one: Types.One? ->
  if (one == null) return@collectLatest
  //rest of loop code
}
And it seems to be working fine. But the annoying part is that I need to manually exit it with the if check in the first line, I guess this is unavoidable right? And overall I just feel like this could be done a bit nicer without the null castings etc. I would love to hear if someone has a better idea 🤔
e
If I understand you correctly, what you want is the equivalent in Flow of this Rx operator? https://github.com/akarnokd/RxJavaExtensions#flowabletransformersvalve
s
I’ve never worked with RX, honestly I’m having a hard time understanding what the thing you linked to me does 😅 RX looks like absolute magic to me tbh My use case is this: I have a AudioPlayer class, basically has a
MediaPlayer
inside of it, but also keeps track of an internal State with the options
Playing, Paused, End
etc. In order to then make sure my UI shows how far in the video we currently are, since I don’t think the Android
MediaPlayer
has a callback to report how far in the video it is, I am this coroutine to every X milliseconds get the current progress %
Copy code
@FloatRange(from = 0.0, to = 1.0)
private fun MediaPlayer.getProgressPercentage(): Float {
    return (currentPosition.toFloat() / duration.toFloat()).coerceIn(0f, 1f)
}
to then in turn update the Playing state that also contains a progress Float inside of it. And I want to start this coroutine when the state turns to
Playing
and let it report back really fast, but then stop it when we’re not on
Playing
anymore. Here is the source code
e
Copy code
fun <T : Any> Flow<T>.valve(valveSource: Flow<Boolean>): Flow<T> =
  this.combine(valveSource.distinctUntilChanged()) { value: T, valve: Boolean -> value to valve }
    .mapNotNull { (value, valve) -> value.takeIf { valve } }
👍 1
s
I am actually quite confused as to how this would be applied to solve my problem 🤔
e
Copy code
someFlow.distinctUntilChanged()
    .flatMapLatest { types ->
      if (types is Types.One) {
        flow {
          while (true) {
            emit(doWork(types))
          }
        }
      } else emptyFlow()
    }
👍 1
You're right,
valve
does not apply to your specific problem, but the code snippet above it should
s
What I did settle with is this, and it’s working good enough:
Copy code
audioPlayerState
    .map { type -> type.shouldContinuouslyDoWork } // added this as a property to Types to make this easier
    .distinctUntilChanged()
    .collectLatest { shouldContinuouslyDoWork: Boolean ->
        if (shouldContinuouslyUpdateProgress) {
            coroutineScope {
                while (isActive) {
                    delay(///)
                    doWork()
                }
            }
        } else {
            doWork()
        }
    }
Since I am doing the work in-line there and don’t need to transform this to another flow to be used somewhere there was no need for the flatMapLatest that you suggested. But if I did, that looks like a great approach yeah. Thanks a lot for trying to help me out btw!
e
Yeah, the idea is the same, normally I expose Flows in ViewModel and collect them in the View, I find it easier to test that way using Turbine, I would be keen to see how you’re testing that code if you don’t mind. Thanks
s
This is in a bit of a different context, not a ViewModel exposing flows and a view observing them. It’s a class that does all the work inside itself and exposes a simple interface that exposes three functions to start/stop/seek on the media player and a StateFlow with it’s current internal state for the UI to observe. Implementation is here With that said, I am not testing this (oops 😳) so yeah...