Is there a reason <trySendBlocking()> is JVM only?...
# coroutines
j
Is there a reason trySendBlocking() is JVM only? How can I achieve this behavior in Kotlin/Native? My use case is adapting a callback API to Flow, without potentially dropping emissions as
trySend()
could.
Copy code
val flow = callbackFlow {
    val job = addListener {
        trySendBlocking(it) // unavailable in Kotlin/Native
    }
    awaitClose {
        removeListener(job)
    }
}
I suppose I could buffer the flow:
Copy code
val flow = callbackFlow {
    val job = addListener {
        trySend(it)
    }
    awaitClose {
        removeListener(job)
    }
}.buffer()
Helps avoid dropping emissions (up to the buffer capacity). But is there a way to do this blocking the send?
o
I think you are looking for send() instead of trySend()?
j
send()
is a suspend function and there's no coroutine in the listener callback, which is why I'm looking for the blocking equivalent
trySendBlocking()
. I suppose I could also launch a coroutine within the
ProducerScope
provided by the
callbackFlow()
builder for
send()
to suspend in.
Copy code
val flow = callbackFlow {
    val job = addListener {
        launch { send(it) }
    }
    awaitClose {
        removeListener(job)
    }
}
o
What is your usecase?
j
If you want to block, why not
runBlocking { send(it) }
?
b
launching a coroutine for sending may lead to broken order of these events (e,g, if multithreaded dispatcher is used). You can use
Copy code
.buffer(capacity = Channel.UNLIMITED)
if you don't want to drop emission when exceeding buffer capacity
iirc,
runBlocking
is also JVM only
j
My use case is for converting some database change callback APIs to Kotlin Flows in a KMM shared module. That's a great point about launched coroutines not guaranteeing ordering.
runBlocking()
is available in Kotlin/Native. I looked at the JVM
trySendBlocking()
implementation, which uses
runBlocking()
internally. I copied the function to my KMM shared module and it has all the dependencies for Kotlin/Native, only warns about using
InternalCoroutinesApi
with
ChannelResult
. So doesn't seem to be a reason this function isn't available in Kotlin/Native.
Copy code
@Throws(InterruptedException::class)
public fun <E> SendChannel<E>.trySendBlocking(element: E): ChannelResult<Unit> {
    trySend(element).onSuccess { return ChannelResult.success(Unit) }
    return runBlocking {
        val r = runCatching { send(element) }
        if (r.isSuccess) ChannelResult.success(Unit)
        else ChannelResult.closed(r.exceptionOrNull())
    }
}
o
@Jeff Lockhart that's not what I really meant with an usecase. We need to know why you want to do that. What are you trying to achieve? Like a more high level example of what you want
j
I want to use Kotlin Flow APIs instead of callback APIs with the various uses of the database throughout our app. I'm writing extensions to provide these APIs. The use cases could be many different things from listening for database changes, live queries, external sync changes, etc. The APIs are all similar and converting them to Flows all use the same pattern.
o
@Jeff Lockhart Ok, let me try it in another way: why do you not use trySend()?
j
I want to avoid trySend() potentially dropping emissions if the channel is full. trySendBlocking() blocks if this would happen with trySend().
o
Why do you want to avoid potentially dropping emissions?
j
Because they may be emissions that collectors should handle, for example a database change that a subscriber should notify the user about, where not just the last change, but all changes should be handled.
o
Ok thanks that clears it up, that gives me a higher view of your usecase
👍 1
I think if that is what you are trying to achieve, your channel should have an infinite buffer?
j
That may be the best option to guarantee delivery. Also avoids blocking as well. It does seem like
trySendBlocking()
could be added to Kotlin/Native though. I created a YouTrack issue.
o
I think you want to stay away from blocking methods as much as possible
j
The deprecated
sendBlocking()
function is available in Kotlin/Native, which functions similarly, only doesn't catch exceptions to return as a result.