https://kotlinlang.org logo
Title
j

Jeff Lockhart

10/13/2021, 6:39 AM
Is there a reason trySendBlocking() is JVM only? How can I achieve this behavior in Kotlin/Native? My use case is adapting a callback API to Flow, without potentially dropping emissions as
trySend()
could.
val flow = callbackFlow {
    val job = addListener {
        trySendBlocking(it) // unavailable in Kotlin/Native
    }
    awaitClose {
        removeListener(job)
    }
}
I suppose I could buffer the flow:
val flow = callbackFlow {
    val job = addListener {
        trySend(it)
    }
    awaitClose {
        removeListener(job)
    }
}.buffer()
Helps avoid dropping emissions (up to the buffer capacity). But is there a way to do this blocking the send?
o

Orhan Tozan

10/13/2021, 6:55 AM
I think you are looking for send() instead of trySend()?
j

Jeff Lockhart

10/13/2021, 7:26 AM
send()
is a suspend function and there's no coroutine in the listener callback, which is why I'm looking for the blocking equivalent
trySendBlocking()
. I suppose I could also launch a coroutine within the
ProducerScope
provided by the
callbackFlow()
builder for
send()
to suspend in.
val flow = callbackFlow {
    val job = addListener {
        launch { send(it) }
    }
    awaitClose {
        removeListener(job)
    }
}
o

Orhan Tozan

10/13/2021, 8:23 AM
What is your usecase?
j

Joffrey

10/13/2021, 8:43 AM
If you want to block, why not
runBlocking { send(it) }
?
b

bezrukov

10/13/2021, 8:43 AM
launching a coroutine for sending may lead to broken order of these events (e,g, if multithreaded dispatcher is used). You can use
.buffer(capacity = Channel.UNLIMITED)
if you don't want to drop emission when exceeding buffer capacity
iirc,
runBlocking
is also JVM only
j

Jeff Lockhart

10/13/2021, 4:33 PM
My use case is for converting some database change callback APIs to Kotlin Flows in a KMM shared module. That's a great point about launched coroutines not guaranteeing ordering.
runBlocking()
is available in Kotlin/Native. I looked at the JVM
trySendBlocking()
implementation, which uses
runBlocking()
internally. I copied the function to my KMM shared module and it has all the dependencies for Kotlin/Native, only warns about using
InternalCoroutinesApi
with
ChannelResult
. So doesn't seem to be a reason this function isn't available in Kotlin/Native.
@Throws(InterruptedException::class)
public fun <E> SendChannel<E>.trySendBlocking(element: E): ChannelResult<Unit> {
    trySend(element).onSuccess { return ChannelResult.success(Unit) }
    return runBlocking {
        val r = runCatching { send(element) }
        if (r.isSuccess) ChannelResult.success(Unit)
        else ChannelResult.closed(r.exceptionOrNull())
    }
}
o

Orhan Tozan

10/13/2021, 4:35 PM
@Jeff Lockhart that's not what I really meant with an usecase. We need to know why you want to do that. What are you trying to achieve? Like a more high level example of what you want
j

Jeff Lockhart

10/13/2021, 4:39 PM
I want to use Kotlin Flow APIs instead of callback APIs with the various uses of the database throughout our app. I'm writing extensions to provide these APIs. The use cases could be many different things from listening for database changes, live queries, external sync changes, etc. The APIs are all similar and converting them to Flows all use the same pattern.
o

Orhan Tozan

10/13/2021, 4:40 PM
@Jeff Lockhart Ok, let me try it in another way: why do you not use trySend()?
j

Jeff Lockhart

10/13/2021, 4:47 PM
I want to avoid trySend() potentially dropping emissions if the channel is full. trySendBlocking() blocks if this would happen with trySend().
o

Orhan Tozan

10/13/2021, 4:48 PM
Why do you want to avoid potentially dropping emissions?
j

Jeff Lockhart

10/13/2021, 4:50 PM
Because they may be emissions that collectors should handle, for example a database change that a subscriber should notify the user about, where not just the last change, but all changes should be handled.
o

Orhan Tozan

10/13/2021, 4:51 PM
Ok thanks that clears it up, that gives me a higher view of your usecase
👍 1
I think if that is what you are trying to achieve, your channel should have an infinite buffer?
j

Jeff Lockhart

10/13/2021, 5:20 PM
That may be the best option to guarantee delivery. Also avoids blocking as well. It does seem like
trySendBlocking()
could be added to Kotlin/Native though. I created a YouTrack issue.
o

Orhan Tozan

10/13/2021, 5:27 PM
I think you want to stay away from blocking methods as much as possible
j

Jeff Lockhart

10/13/2021, 5:28 PM
The deprecated
sendBlocking()
function is available in Kotlin/Native, which functions similarly, only doesn't catch exceptions to return as a result.