I’m struggling to write tests around `callbackFlow...
# coroutines
r
I’m struggling to write tests around
callbackFlow
. I want to ensure that the value emitted from the flow matches the value that the callback receives. My problem appears to be that the
ProducerScope
block doesn’t get called until something subscribes to the flow. I thought that using
shareIn
would fix my problem, but it still doesn’t appear to kick off the
ProducerScope
in time
Copy code
fun someTest() = runBlocking {
  val expected = "someMessage"

  val resultFlow = getCallbackFlow().shareIn(this, SharingStarted.Eagerly, replay = 1)

  sendMessageToTriggerCallback(expected)

  val actual = resultFlow.first()

  assertEquals(expected, actual)
}
After adding some logging, I find that what happens is that first,
sendMessageToTriggerCallback
is invoked, then the callback is registered in the
ProducerScope
, then it suspends indefinitely awaiting the first signal from the
resultFlow
. I’ve tried a lot of variations on this pattern, and I can’t seem to make the
ProducerScope
run without blocking my ability to subsequently trigger sending the message. I’ve tried launching callback flow collection in its own job and sending the message a separate job and yielding between them. I’ve tried leveraging
onSubscription
, but that doesn’t fix the issue either, I get the same execution order as the example above. I’m at a bit of a loss as to how to enforce a deterministic execution order in this test case.
j
Using
SharedFlow.onSubscription
should be what you need, though. It's strange that it doesn't work. Do you mind sharing your
callbackFlow
definition? Have you tried launching the
first()
inside an
async(start = UNDISPATCHED)
? This is what I usually do.
r
Yeah, that was my assumption. It’s a bit complicated to share the details of the
callbackFlow
. It essentially binds a callback to an event bus. I have to scrub the impl, give me a minute.
I hope the event bus implementation isn’t the issue.. that’s a very ugly beast to unravel.
j
Note that depending on the implementation, both undispatched async and onSubscription might be insufficient. They just ensure the collector is ready to receive the first emit but they don't ensure that the callback flow's body has reached the actual listener registration point
r
ah, yeah, then the bus may very well be the issue
j
The most reliable way to ensure that is to emit a sentinel event that says when the callback has been registered
r
hm, I could probably spike the registration call with the event signal 🤔
thanks for taking the time to ponder this with me 🙏
j
I've faced so many similar issues when implementing the krossbow library (a STOMP/web socket client), I'm happy to help!
a
it's exactly as you say, @Joffrey, this is where it happens: https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt#L119 a callbackFlow/channelFlow's block is launched with
CoroutineStart.ATOMIC
so it ends up in the dispatch queue even if you launch the collector coroutine undispatched
👍 1
r
I was able to isolate the problem to the event bus implementation. 😞
Thanks for all the help 🙏
a
you might find this useful: a quick and dirty implementation of a
callbackFlow {}
builder that launches the producer block undispatched so that you don't miss events in cases like this: https://pl.kotl.in/2yeSbUSYQ
❤️ 1
from your event bus => flow registration code though, you seem to be leaking your event bus listener (you should be closing/unregistering as part of the
awaitClose {}
block) and your
flowOn
operator is introducing another dispatch step
1
if your event bus callback registration is thread-safe then you don't need the
flowOn
at all; if it's not thread-safe and you're using the
flowOn
to guarantee which thread you're registering/unregistering on, then you should add the
flowOn
in your real/live integration code and omit it in the unit under test here
r
Yeah, there’s a special utility function that I omitted from the
awaitClose
call that handles unregistering the callback.
a
(come to think of it I should have omitted the default empty/no-op lambda from the copied ProducerScope API in the playground link above to make it a little harder to forget to unregister the callbacks)
r
That’s a really great example, thank you! I’m looking it over right now. All I’m trying to do at the moment is write tests around existing code so that I can change it safely
👍 1
493 Views