Trying to come up with good way to suspend until t...
# coroutines
d
Trying to come up with good way to suspend until the first emission from the flow which is launched in parallel. app code:
Copy code
class Feature  {
   private val scope = CoroutineScope()
   private val _flow = MutableSharedFlow<Int>()
   
   fun start() {
     scope.launch { flow.emit(1); flow.emit(2) }
   }

   val flow: Flow<Int> = _flow
}
test code:
Copy code
should("do something") {
 val awaitFirstEmit = feature.flow.onStart { feature.start() }.first()
 awaitFirstEmit shouldBe 1
}
Is it OK to use
onStart
like this, or will this produce flakiness? the important thing here is I want to ensure that first item is not lost.
j
If
Feature.flow
was exposed as
SharedFlow
, you could use
onSubscription
for this, which is meant for that. As for
Flow.onStart
, I'm not 100% sure there is this guarantee
d
yep, that's the thing, it wasn't exposed as shared and it's kind of an implementation detail...
But this is my library, so I'll think about on how to restructure stuff so that I can produce some kind of combinator which relies on
onSubscription
internally
j
Yeah I just checked the code, and the guarantee doesn't apply. Basically the body of
onStart
is run before the upstream collection begins - effectively calling
feature.start()
before subscribing to the shared flow. You can prove it doesn't work by adding an atrifical delay after
feature.start()
in
onStart
in your test. It will hang forever if it missed the emissions
That said, if the whole thing runs in a single thread, without suspension points other than
first()
you could consider it a guarantee that
first
is reached by the main coroutine of the test before the emissions are done by the launched coroutine. But it's probably not your case here, since
CoroutineScope()
likely uses the
Default
dispatcher. And in any case I would personally not like to rely on that 😄
d
Yeah, I use the
Default
by default 🙂 Thank you for suggestions and advice! Will think about how to best handle this!
n
What about:
Copy code
should("do something") {
  val awaitFirstEmit = async { feature.flow.first() }
  advanceUntilIdle() //This should guarantee the subscription
  feature.start()
  //might need another advanceUntilIdle() here but I don't think so
  awaitFirstEmit.await() shouldBe 1
}
d
Nice! Thank you! Didn't use test scope and koTest (which I use) even supports it too.