azabost
09/29/2023, 10:04 PMSharedFlow
before emitting a value from coroutine "B".
I have a SharedFlow
where all the inbound messages from a WebSocket are being emitted.
I want to implement a piece of code that would encapsulate a request-response like behaviour. Something like:
val allInboundMessages: SharedFlow<Message>
suspend fun makeRequestAndWaitForResponse() = coroutineScope {
val clientToken = UUID.randomUUID().toString()
val responseJob = async { // would start = CoroutineStart.UNDISPATCHED be enough?
withTimeout(10.seconds) { // Make sure we don't wait indefinitely
allInboundMessages.first { it.clientToken == clientToken }
}
}
val request = Request(clientToken)
webSocket.sendRequest(request) // returns Unit
// when the response arrives, another coroutine will emit a message in allInboundMessages
responseJob.await()
}
How do I know if responseJob
is already collecting SharedFlow
before sending the request, to make sure the response will be collected?ephemient
09/29/2023, 11:37 PMval sem = Semaphore(permits = 1, acquiredPermits = 1)
val responseJob = async {
withTimeout(10.seconds) {
allInboundMessages
.onStart { sem.release() }
.first { ... }
}
}
sem.acquire()
would work (untested)Sam
09/30/2023, 6:45 AMallInboundMessages
.onSubscription { webSocket.sendRequest(request) }
.filter { it.clientToken == clientToken }
.timeout(10.seconds)
.first()
Using onSubscription
instead of onStart
is important, otherwise the response could in theory arrive before the collector starts listening for it.azabost
09/30/2023, 9:46 AMephemient
09/30/2023, 1:42 PMonStart
invoked after the collector starts? otherwise onStart { emit(...) }
wouldn't workSam
09/30/2023, 1:47 PMonStart
talk about it:
Returns a flow that invokes the given action before this flow starts to be collected.
The action is called before the upstream flow is started, so if it is used with a SharedFlow there is no guarantee that emissions from the upstream flow that happen inside or immediately after this onStart action will be collected (see onSubscription for an alternative operator on shared flows).
ephemient
09/30/2023, 1:49 PMephemient
09/30/2023, 1:50 PMephemient
09/30/2023, 1:52 PMonSubscription
is the closest callback, for the flows that support it