https://kotlinlang.org logo
#coroutines
Title
# coroutines
j

jeff

02/26/2021, 3:51 PM
This test fails:
Copy code
@Test
  fun `emitting too fast`() {
    val scope = CoroutineScope(Dispatchers.Default)
    val actionFlow = MutableSharedFlow<String>(extraBufferCapacity = 10)
    var success = false

    actionFlow
      .onEach { success = true }
      .launchIn(scope) // 1

    actionFlow.tryEmit("HELLO") // 2
    

    runBlocking {
      withTimeout(1000) {
        while (!success) {
          yield()
        }
      }
      success shouldBe true
    }
  }
I understand why, I think, it's because the 'subscription' to actionFlow is launched (
1
) and therefore hasn't happened yet by the time I try to emit (
2
). Therefore, because `SharedFlow`s don't buffer or anything by default, the emission of "HELLO" is just dropped. My question is, what's the best way to make sure
1
is ready before I do
2
. I can imagine there's a way to use
SharedFlow.subscriptionCount
but that seems inelegant. Any better way?
a

araqnid

02/26/2021, 4:11 PM
specify replay=1 on the MutableSharedFlow and it will replay the “HELLO” event on subscription
put some delay in before the tryEmit - e.g. keep everything in one scope and yield() to give a chance for the subscriber to launch:
Copy code
val actionFlow = MutableSharedFlow<String>(extraBufferCapacity = 10)
        var success = false
        runBlocking {
            val job = actionFlow
                .onEach { success = true }
                .launchIn(this) // 1
            try {
                yield()
                actionFlow.tryEmit("HELLO") // 2
                withTimeout(1000) {
                    while (!success) {
                        yield()
                    }
                }
                assertTrue(success)
            } finally {
                job.cancel()
            }
        }
j

jeff

02/26/2021, 4:13 PM
So, I use this flow elsewhere also, and I don't want replay behavior on it.
I tried yield() and it's better, but it still fails around 1 in 40 times (on my box)
a

araqnid

02/26/2021, 4:14 PM
that’s surprising, I would have thought taking out the use of another thread would make it deterministic
j

jeff

02/26/2021, 4:16 PM
yeah so would I. what I tried isn't word for word what you've suggested, I'll try that just in case
Nope, interesting, your version works... 🤔
here is what I tried that almost works (fails infrequently):
Copy code
@Test
  fun `blank username`() {
    val scope = CoroutineScope(Dispatchers.Default).childScope()
    val actionFlow = MutableSharedFlow<String>(extraBufferCapacity = 10)
    var success = false

    actionFlow.onEach {
      success = true
    }
      .launchIn(scope)

    scope.launch {
      yield()
      actionFlow.tryEmit("HELLO")
    }

    runBlocking {
      withTimeout(1000) {
        while (!success) {
          yield()
        }
      }
      success shouldBe true
    }
  }
I think your version only works because the scope being passed to
launchIn
is the one from
runBlocking
which is using a single-threaded dispatcher
For posterity, I solved it by throwing a
Copy code
while (actionFlow.subscriptionCount.value == 0) {}
around the yield. I still think there's probably a more elegant way, but this works for now. Thanks for your help!
a

araqnid

02/26/2021, 5:04 PM
yes, that was an intentional choice - have only one scope, avoid any other dispatcher. Which works for simple tests, but I guess can be harder to push down into larger chunks. Spinning until the subscription count has increased makes sense .. all sorts of wierd things come out in test fixtures 😄
☝️ 1
2 Views