Gilles Barbier
01/30/2021, 6:04 PMgildor
01/31/2021, 12:30 PMGilles Barbier
01/31/2021, 12:43 PMMutableSharedFlow
, that I use to receive all response managed by this client instance, and wait (with a timeout) while I do not receive the current one:
fun main() = runBlocking {
val h = ResponseHandler(this)
launch {
try {
val result = h.sync("1")
println("result: $result")
} catch(e: Exception) {
println(e)
}
}
Unit
}
class ResponseHandler(
private val scope: CoroutineScope,
) {
private val responseFlow = MutableSharedFlow<String>(replay = 0)
suspend fun sync(callId: String): String {
var output: String? = null
val job = scope.launch {
// Listen for all responses
withTimeout(4000L) {
responseFlow.collect {
if (it == callId) {
output = it
this.cancel()
}
}
}
}
// Simulate queue behavior
useQueue(callId)
// wait for receiving the asynchronous response
job.join()
return output ?: throw Exception("Timeout")
}
private suspend fun useQueue(callId: String) {
// Simulate sending to queue
println("sending $callId")
val delay = Random.nextLong(0, 5000)
delay(delay)
// Simulate receiving from queue
println("receiving $callId after $delay")
responseFlow.emit(callId)
}
}