I would like to send an event where the receiver m...
# coroutines
v
I would like to send an event where the receiver may not be immediately listening for the event, but when it does, I would like that event to get sent to the receiver. I assumed I could use a Broadcast channel with a capacity of 1 and using the
send
suspend function would suspend the coroutine until receiver started listening but that doesn’t seem to be the case. Can anyone provide some input on how to make this work? Sender class
Copy code
private val _myEvents = BroadcastChannel<String>(1)
val myEvents = _viewEvents.asFlow()

...
fun doSomething() {
   _myEvents.send("sample string")
}
c
From the documentation, it looks like events sent to the channel before it has any subscribers are dropped, rather than being queued https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel.html
v
Makes sense, any ideas on how I can get this to work without the Broadcast channel?
s
Is this for just events (not stateful) or for values (stateful/conflated)?
v
yes, just for events (not stateful)
s
Also, you say “_*the*_ receiver”. Will there be only one receiver/collector?
v
yes, only one
s
E,g. if there is only one receiver/collector, you can do this:
Copy code
class EventFlow<T>(
    private val channel: Channel<T> = Channel()
) : Flow<T> by channel.receiveAsFlow() {

    /**
     * Emits the value that is assigned to this property onto this [Flow] and suspends until
     * the value has been sent.
     */
    suspend fun sendValue(value: T) {
        channel.send(value)
    }
}
Copy code
val eventFlow = EventFlow<String>()
...
eventFlow.send("sample string")
The call to
send
will suspend until a collector is ready and collects the sent string.
v
Ah, so its a rendezvous channel, and you’re consuming it as a flow
👌 1
s
Yup. Caveat is that there can be only one collector of that (Event)Flow…
v
What happens if you have multiple collectors?
s
The problem with broadcast channels with multiple subscribers/collectors is that some of them may be ready (suspending), other may still be busy doing other stuff (non-suspending); what should
send
on a broadcast channel do in that case…. Nothing; other collectors will not get anything. See the
Channel.receiveAsFlow()
vs
Channel.consumeAsFlow()
in the KDoc for the difference between the two.. you may want to use consumeAsFlow instead….
v
Thanks!
s
We have this snippet of code to support both conflated flows and event flows:
Copy code
typealias ChannelFlow<T> = BaseChannelFlow<T, SendChannel<T>>

abstract class BaseChannelFlow<T, out C : SendChannel<T>> protected constructor(
    protected val channel: C
) : Flow<T> by asFlow(channel) {

    /**
     * Emits the value that is assigned to this property onto this [Flow] and suspends until
     * the value has been sent.
     */
    suspend fun sendValue(value: T) {
        channel.send(value)
    }

    companion object {
        @Suppress("UNCHECKED_CAST")
        private fun <T> asFlow(channel: SendChannel<T>): Flow<T> = when (channel) {
            is BroadcastChannel<T> -> channel.asFlow()
            else -> (channel as? ReceiveChannel<T>)!!.receiveAsFlow()
        }
    }
}

/**
 * A [Flow] of type [T] backed by a state-full conflated broadcast-channel.
 *
 * This class can be used for state-full emissions of UI-data.
 */
class StateFlow<T> : BaseChannelFlow<T, ConflatedBroadcastChannel<T>>(ConflatedBroadcastChannel()) {
    val value: T? get() = channel.valueOrNull
}

/**
 * A [Flow] of type [T] backed by a state-less rendezvous channel.
 *
 * This class can be used for state-less event emissions for UI-navigation, for example.
 */
class EventFlow<T> : BaseChannelFlow<T, Channel<T>>(Channel())
j
Once coroutines 1.4 is out, I believe this will be solved by a
SharedFlow
with replay cache of size 1: https://github.com/Kotlin/kotlinx.coroutines/issues/2034 https://github.com/Kotlin/kotlinx.coroutines/pull/2069
g
SharedFlow + replay(1) can be replaced with StateFlow, it essentially the same semantics, so in case of multiple subscribers all of them receive the same latest event Not sure what kind semantics is required for Victor's case, should event be sent only to one of them, or it always one to one, but I don't think it one to one, it's not very common case for events
Personally I prefer to use consumable event wrapper + StateFlow if I want to have event source where events are not loosing of there are no subscribers
j
I think SharedFlow with replay(1) is only equivalent to StateFlow in one of the buffer overflow policies, not in every case. But I haven't looked into it for some time to be honest
g
Not only buffer overflow, but it behaves the same way for consumer of the stream, it just doesn't provide
value
property
j
What I meant is that if buffer overflow is not
DROP_OLDEST
(meaning it’s either
DROP_LATEST
or
SUSPEND
), then you don’t get the “latest state” behaviour even from the consumer side.
g
Ah, I see what you mean, yeah, sure, you right. I just assumed only default behavior, because other buffer strategies do not have much sense in discussed use case