Hi all, How can I continuously send requests in a ...
# coroutines
a
Hi all, How can I continuously send requests in a Flow without blocking the method? I need to send pings in the background and allow sending additional requests in another coroutine. Here’s my setup:
Copy code
public fun subscribe(requests: Flow<GeyserOuterClass.SubscribeRequest>, headers: Metadata =
        Metadata()): Flow<GeyserOuterClass.SubscribeUpdate>

val reqFlow = MutableStateFlow(
    subscribeRequest {}
)

// Subscribing
client.subscribe(reqFlow).catch { e ->
    println("Flow exception: ${e.message}")
}.collect {
    handle(it, "message")
}

// Pinger to send periodic requests
private fun startPinger(subscription: MutableStateFlow<SubscribeRequest>): Job {
    var counter = 0
    return CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO> + SupervisorJob()).launch {
        try {
            while (isActive) {
                subscription.update {
                    SubscribeRequest.newBuilder()
                        .setPing(
                            GeyserOuterClass.SubscribeRequestPing.newBuilder()
                                .setId(counter++)
                                .build()
                        )
                        .build()
                }
                delay(10000)
            }
        } catch (e: Exception) {
            <http://logger.info|logger.info>("Pinger stopped")
        }
    }
}
The issue: The first request from reqFlow is not sent. How can fix it, or may be some another way?
u
First, you should always hold a reference to your coroutine scope
CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO> + SupervisorJob())
as otherwise it can be garbage collected if all coroutines are suspended.
Besides, i do not really understand your code. Too much missing for me. But another general note. MutablesStateFlows are conflated. So If there is a delay between statPing and actually collecting the flow, then all but your latest update will not be seen by the collector.
You can try by verify this theory by putting a delay before
while (isActive) {
If conflation is your issue, you can switch your MutableStateFlow for a flow builder and emit your updates. Then you can decide to add buffers, or simply let
emit
suspend until there is a receiver. https://kotlinlang.org/docs/flow.html#buffering
Or more suitable for your use case, have one flow builder to create a flow for the pings and one flow for the requests. then use merge to create a flow with pings and requests
Anyway. beware stateflows for events. They are for state. For state only the latest state is relevant.