azabost
09/29/2023, 10:04 PMSharedFlow before emitting a value from coroutine "B".
I have a SharedFlow where all the inbound messages from a WebSocket are being emitted.
I want to implement a piece of code that would encapsulate a request-response like behaviour. Something like:
val allInboundMessages: SharedFlow<Message>
suspend fun makeRequestAndWaitForResponse() = coroutineScope {
val clientToken = UUID.randomUUID().toString()
val responseJob = async { // would start = CoroutineStart.UNDISPATCHED be enough?
withTimeout(10.seconds) { // Make sure we don't wait indefinitely
allInboundMessages.first { it.clientToken == clientToken }
}
}
val request = Request(clientToken)
webSocket.sendRequest(request) // returns Unit
// when the response arrives, another coroutine will emit a message in allInboundMessages
responseJob.await()
}
How do I know if responseJob is already collecting SharedFlow before sending the request, to make sure the response will be collected?ephemient
09/29/2023, 11:37 PMval sem = Semaphore(permits = 1, acquiredPermits = 1)
val responseJob = async {
withTimeout(10.seconds) {
allInboundMessages
.onStart { sem.release() }
.first { ... }
}
}
sem.acquire()
would work (untested)Sam
09/30/2023, 6:45 AMallInboundMessages
.onSubscription { webSocket.sendRequest(request) }
.filter { it.clientToken == clientToken }
.timeout(10.seconds)
.first()
Using onSubscription instead of onStart is important, otherwise the response could in theory arrive before the collector starts listening for it.azabost
09/30/2023, 9:46 AMephemient
09/30/2023, 1:42 PMonStart invoked after the collector starts? otherwise onStart { emit(...) } wouldn't workSam
09/30/2023, 1:47 PMonStart talk about it:
Returns a flow that invokes the given action before this flow starts to be collected.
The action is called before the upstream flow is started, so if it is used with a SharedFlow there is no guarantee that emissions from the upstream flow that happen inside or immediately after this onStart action will be collected (see onSubscription for an alternative operator on shared flows).
ephemient
09/30/2023, 1:49 PMephemient
09/30/2023, 1:50 PMephemient
09/30/2023, 1:52 PMonSubscription is the closest callback, for the flows that support it