I have the following helper function that I'm work...
# coroutines
d
I have the following helper function that I'm working on to sequence the sending of a message with a blocking receive for a specific message type. The use case is we're sending a message to an external device and listening for the response on a Flow. My concern is that there is a race condition in the code (even if it is really small) where the response comes back before the flow collection is set up. Is there a technique that I can use to make sure that the collection is ready prior to sending my message?
Copy code
suspend fun sendMessageAwaitResponse(
    sendMessageFunc: suspend () -> Unit,
    rxFlow: Flow<CustomMessage>,
    responseType: Class<out CustomMessage>
): CustomMessage {
    lateinit var response: CustomMessage
    val jobs = listOf(
        lifecycleScope.launch {
            response = rxFlow.first { msg ->
                msg.javaClass == msgTypeClass
            }
        },
        lifecycleScope.launch {
            sendMessageFunc()
        },
    )
    jobs.joinAll()
    return response
}
r
I think it depends what thread your rxFlow starts collecting on (i.e. if there's any .flowOn further up the chain) I'd probably try something like:
Copy code
suspend fun awaitCallback(
    sendMessageFunc: suspend () -> Unit,
    rxFlow: Flow<CustomMessage>,
): CustomMessage {
    val async = scope.async(start = CoroutineStart.UNDISPATCHED) {
        rxFlow.first()
    }

    sendMessageFunc()

    return async.await()
}
1
the UNDISPATCHED means the await should run until it suspends on
.first()
before it resumes the outer block and calls
sendMessageFunc()
d
Thanks for the advice. I dropped that in as a replacement and it seems to be working well. I don't think we're in a situation currently where the original race condition would actually expose itself, so I'll have to stress it later to verify everything
b
We have similar use-case and yes we use start=UNDISPATCHED for years, works like a charm
👍 2