Hi, I'm writing a client that needs to transform a...
# coroutines
g
Hi, I'm writing a client that needs to transform an asynchronous execution to a synchronous one: when receiving a request, the client sends a message to a queuing system and waits for a response in a response queue (one per client instance). When receiving the response, it should return it to the initial synchronous request. Note that multiple requests can be done in parallel on the same client instance. I'm not sure how to implement that. Any suggestion ? Is there a coroutine pattern associated to this situation? Thanks
g
It's really hard to tell exact recipe with such very abstract description of your case, but looks that it shouldn't be hard with coroutines, "waits" can be replaced with suspend calls. Also no problem to make multiple requests in parallel,
g
I have found a working implementation using
MutableSharedFlow
, that I use to receive all response managed by this client instance, and wait (with a timeout) while I do not receive the current one:
Copy code
fun main() = runBlocking {
    val h = ResponseHandler(this)
    launch {
        try {
            val result = h.sync("1")
            println("result: $result")
        } catch(e: Exception) {
            println(e)
        }
    }

    Unit
}

class ResponseHandler(
    private val scope: CoroutineScope,
) {
    private val responseFlow = MutableSharedFlow<String>(replay = 0)

    suspend fun sync(callId: String): String {
        var output: String? = null
        val job = scope.launch {
            // Listen for all responses
            withTimeout(4000L) {
                responseFlow.collect {
                    if (it == callId) {
                        output = it
                        this.cancel()
                    }
                }
            }
        }
        // Simulate queue behavior
        useQueue(callId)

        // wait for receiving the asynchronous response
        job.join()

        return output ?: throw Exception("Timeout")
    }

    private suspend fun useQueue(callId: String) {
        // Simulate sending to queue
        println("sending $callId")
        val delay = Random.nextLong(0, 5000)
        delay(delay)
        // Simulate receiving from queue
        println("receiving $callId after $delay")
        responseFlow.emit(callId)
    }
}