https://kotlinlang.org logo
#coroutines
Title
# coroutines
m

mng

12/14/2020, 10:41 PM
I recently created a Timer that is handled through Coroutines, encapsulated as a
TimerFlow
object. I wanted to Unit Test this object but I stumbled across a problem, because I am exposing a
SharedFlow
for every “tick” of the Timer I am having trouble testing it since the
replay
value is set to 0. What are the best practices in regards to setting up a Timer like I am here, should I even use a
SharedFlow
to expose every tick or should I be using something else? If the approach I am using is correct, what would be the proper way for me to test this?
Copy code
class TimerFlow(
    val type: TimerType,
    val tickIntervalMillis: Long,
    private val externalScope: CoroutineScope
) {
    private val _timer = MutableSharedFlow<TimerTick>(replay = 0)
    val timer: SharedFlow<TimerTick> = _timer

    private var state: TimerState = TimerState.NotStarted
    private var _elapsedTimeMillis = 0L
    val elapsedTimeMillis: Long
        get() = _elapsedTimeMillis

    private val timerAsync: Deferred<Unit> = getCorrectTimerAsync(type)

    fun start() {
        if (state == TimerState.Completed || state == TimerState.Stopped) return

        externalScope.launch { handleStateChange(TimerState.Started) }
    }

    fun stop() {
        externalScope.launch { handleStateChange(TimerState.Stopped) }
    }

    private suspend fun tick() {
        _timer.emit(TimerTick(
            elapsedTimeMillis = _elapsedTimeMillis,
            state = state
        ))
        delay(tickIntervalMillis)
        _elapsedTimeMillis += tickIntervalMillis
    }

    private suspend fun handleStateChange(state: TimerState) {
        this.state = state
        when (state) {
            TimerState.NotStarted -> {
                /**
                 * Do nothing
                 */
            }
            TimerState.Started -> {
                timerAsync.start()
            }
            /**
             * Cancel the timer and send the final tick
             */
            TimerState.Stopped,
            TimerState.Completed -> {
                timerAsync.cancel()
                _timer.emit(TimerTick(
                    elapsedTimeMillis = elapsedTimeMillis,
                    state = state
                ))
            }
        }
    }

    private fun getCorrectTimerAsync(type: TimerType): Deferred<Unit> {
        return when (type) {
            TimerType.Indefinite -> {
                externalScope.async(start = CoroutineStart.LAZY) {
                    while (state != TimerState.Stopped &&
                        state != TimerState.Completed && this.isActive) {
                        tick()
                    }
                }
            }
            is TimerType.Duration -> {
                externalScope.async(start = CoroutineStart.LAZY) {
                    while (state != TimerState.Stopped &&
                        state != TimerState.Completed && this.isActive) {
                        if (_elapsedTimeMillis < type.durationMillis) {
                            tick()
                        } else {
                            handleStateChange(TimerState.Completed)
                        }
                    }
                }
            }
        }
    }
}
z

Zach Klippenstein (he/him) [MOD]

12/14/2020, 11:52 PM
I am having trouble testing it since the replay value is set to 0.
I don’t see why that should make testing a problem, can you elaborate why the tests aren’t working?
Also, if you haven’t seen Turbine, it might be helpful: https://github.com/cashapp/turbine
👀 1
m

mng

12/15/2020, 4:03 AM
I don’t see why that should make testing a problem, can you elaborate why the tests aren’t working?
Perhaps it's the way I'm setting up the test. I'm emitting values prior to subscribers of the flow so they're not getting the value. What ends up happening is that I get an error saying the job never finishes as it's waiting for a value.
Copy code
// given
val myTimer = TimerTask(Duration(100L), 50L, theScope)

// when
myTimer.start()
val result = myTimer.timer.first()

// then
result.elapsedTime shouldBeEqualTo 50
result.state shouldBeEqualTo Started
I have a test like this but it will never progress pass
val result = myTimer.timer.first()
when debugging and give an error saying "this job has not yet completed"
k

KamilH

12/15/2020, 6:12 AM
In your test, what is
theScope
? Also what is the scape in which you are running your test? I copied your code and tried to run this test in `runBlockingTest`:
Copy code
@Test
fun test() = runBlockingTest {
    // given
    val myTimer = TimerFlow(TimerType.Duration(100L), 50L, this)

    // when
    myTimer.start()
    val result = myTimer.timer.first()

    // then
    assertEquals(50, result.elapsedTimeMillis)
    assertEquals(TimerState.Started, result.state)
}
and it works fine, test is passing
m

mng

12/15/2020, 3:51 PM
thanks for taking a look, @KamilH. I was able to get this one to pass in particular! I think I was messing up by using a new
TestCoroutineScope
instead of
this
. If i were to try to test my Indefinite type Timer and want to check that it is not stopped until after I call stopped how would I go about doing that?
z

Zach Klippenstein (he/him) [MOD]

12/15/2020, 4:21 PM
If you emit before subscribing, and you don’t reply, then you’re not going to see those items. There are two ways to fix that: 1) Explicitly subscribe before emitting, e.g. using a channel. This is what Rx’s
.test()
and Turbine does. 2) “Pause” all the coroutines until you subscribe. This is what using
runBlockingTest
does.
👍 1