Jeff Lockhart
10/13/2021, 6:39 AMtrySend()
could.
val flow = callbackFlow {
val job = addListener {
trySendBlocking(it) // unavailable in Kotlin/Native
}
awaitClose {
removeListener(job)
}
}
val flow = callbackFlow {
val job = addListener {
trySend(it)
}
awaitClose {
removeListener(job)
}
}.buffer()
Helps avoid dropping emissions (up to the buffer capacity). But is there a way to do this blocking the send?Orhan Tozan
10/13/2021, 6:55 AMJeff Lockhart
10/13/2021, 7:26 AMsend()
is a suspend function and there's no coroutine in the listener callback, which is why I'm looking for the blocking equivalent trySendBlocking()
.
I suppose I could also launch a coroutine within the ProducerScope
provided by the callbackFlow()
builder for send()
to suspend in.
val flow = callbackFlow {
val job = addListener {
launch { send(it) }
}
awaitClose {
removeListener(job)
}
}
Orhan Tozan
10/13/2021, 8:23 AMJoffrey
10/13/2021, 8:43 AMrunBlocking { send(it) }
?bezrukov
10/13/2021, 8:43 AM.buffer(capacity = Channel.UNLIMITED)
if you don't want to drop emission when exceeding buffer capacityrunBlocking
is also JVM onlyJoffrey
10/13/2021, 8:45 AMJeff Lockhart
10/13/2021, 4:33 PMrunBlocking()
is available in Kotlin/Native. I looked at the JVM trySendBlocking()
implementation, which uses runBlocking()
internally. I copied the function to my KMM shared module and it has all the dependencies for Kotlin/Native, only warns about using InternalCoroutinesApi
with ChannelResult
. So doesn't seem to be a reason this function isn't available in Kotlin/Native.
@Throws(InterruptedException::class)
public fun <E> SendChannel<E>.trySendBlocking(element: E): ChannelResult<Unit> {
trySend(element).onSuccess { return ChannelResult.success(Unit) }
return runBlocking {
val r = runCatching { send(element) }
if (r.isSuccess) ChannelResult.success(Unit)
else ChannelResult.closed(r.exceptionOrNull())
}
}
Orhan Tozan
10/13/2021, 4:35 PMJeff Lockhart
10/13/2021, 4:39 PMOrhan Tozan
10/13/2021, 4:40 PMJeff Lockhart
10/13/2021, 4:47 PMOrhan Tozan
10/13/2021, 4:48 PMJeff Lockhart
10/13/2021, 4:50 PMOrhan Tozan
10/13/2021, 4:51 PMJeff Lockhart
10/13/2021, 5:20 PMtrySendBlocking()
could be added to Kotlin/Native though. I created a YouTrack issue.Orhan Tozan
10/13/2021, 5:27 PMJeff Lockhart
10/13/2021, 5:28 PMsendBlocking()
function is available in Kotlin/Native, which functions similarly, only doesn't catch exceptions to return as a result.