Hey, me again :sweat_smile: I wonder if there is a...
# coroutines
a
Hey, me again 😅 I wonder if there is any preferred (simple?) way to ensure a launched coroutine "A" is ready to collect a
SharedFlow
before emitting a value from coroutine "B". I have a
SharedFlow
where all the inbound messages from a WebSocket are being emitted. I want to implement a piece of code that would encapsulate a request-response like behaviour. Something like:
Copy code
val allInboundMessages: SharedFlow<Message>

suspend fun makeRequestAndWaitForResponse() = coroutineScope {
    val clientToken = UUID.randomUUID().toString()

    val responseJob = async { // would start = CoroutineStart.UNDISPATCHED be enough?
        withTimeout(10.seconds) { // Make sure we don't wait indefinitely
            allInboundMessages.first { it.clientToken == clientToken }
        }
    }

    val request = Request(clientToken)

    webSocket.sendRequest(request) // returns Unit
    // when the response arrives, another coroutine will emit a message in allInboundMessages

    responseJob.await()
}
How do I know if
responseJob
is already collecting
SharedFlow
before sending the request, to make sure the response will be collected?
e
I'd try to build a structured request-reply system out of the websocket channel rather than have this complexity in each of the users, but in principle I think
Copy code
val sem = Semaphore(permits = 1, acquiredPermits = 1)
val responseJob = async {
    withTimeout(10.seconds) {
        allInboundMessages
            .onStart { sem.release() }
            .first { ... }
    }
}
sem.acquire()
would work (untested)
âž• 1
s
I've encountered a problem very similar to this before, and solved it by sending the request at the point of subscribing to the shared flow.
Copy code
allInboundMessages
    .onSubscription { webSocket.sendRequest(request) }
    .filter { it.clientToken == clientToken }
    .timeout(10.seconds)
    .first()
Using
onSubscription
instead of
onStart
is important, otherwise the response could in theory arrive before the collector starts listening for it.
a
Thanks 🙌
e
isn't
onStart
invoked after the collector starts? otherwise
onStart { emit(...) }
wouldn't work
s
After the downstream collector yes, but before starting to collect the upstream. The docs for
onStart
talk about it:
Returns a flow that invokes the given action before this flow starts to be collected.
The action is called before the upstream flow is started, so if it is used with a SharedFlow there is no guarantee that emissions from the upstream flow that happen inside or immediately after this onStart action will be collected (see onSubscription for an alternative operator on shared flows).
e
but the downstream collector is what OP cares about here
or hmm. actually I see what the timing issue is
ok, I agree
onSubscription
is the closest callback, for the flows that support it