voben
09/11/2020, 3:32 PMsend
suspend function would suspend the coroutine until receiver started listening but that doesn’t seem to be the case. Can anyone provide some input on how to make this work?
Sender class
private val _myEvents = BroadcastChannel<String>(1)
val myEvents = _viewEvents.asFlow()
...
fun doSomething() {
_myEvents.send("sample string")
}
Casey Brooks
09/11/2020, 3:35 PMvoben
09/11/2020, 3:37 PMstreetsofboston
09/11/2020, 3:44 PMvoben
09/11/2020, 3:46 PMstreetsofboston
09/11/2020, 3:54 PMvoben
09/11/2020, 3:55 PMstreetsofboston
09/11/2020, 3:56 PMclass EventFlow<T>(
private val channel: Channel<T> = Channel()
) : Flow<T> by channel.receiveAsFlow() {
/**
* Emits the value that is assigned to this property onto this [Flow] and suspends until
* the value has been sent.
*/
suspend fun sendValue(value: T) {
channel.send(value)
}
}
val eventFlow = EventFlow<String>()
...
eventFlow.send("sample string")
The call to send
will suspend until a collector is ready and collects the sent string.voben
09/11/2020, 3:58 PMstreetsofboston
09/11/2020, 3:58 PMvoben
09/11/2020, 3:59 PMstreetsofboston
09/11/2020, 4:01 PMsend
on a broadcast channel do in that case….
Nothing; other collectors will not get anything.
See the Channel.receiveAsFlow()
vs Channel.consumeAsFlow()
in the KDoc for the difference between the two.. you may want to use consumeAsFlow instead….voben
09/11/2020, 4:02 PMstreetsofboston
09/11/2020, 4:05 PMtypealias ChannelFlow<T> = BaseChannelFlow<T, SendChannel<T>>
abstract class BaseChannelFlow<T, out C : SendChannel<T>> protected constructor(
protected val channel: C
) : Flow<T> by asFlow(channel) {
/**
* Emits the value that is assigned to this property onto this [Flow] and suspends until
* the value has been sent.
*/
suspend fun sendValue(value: T) {
channel.send(value)
}
companion object {
@Suppress("UNCHECKED_CAST")
private fun <T> asFlow(channel: SendChannel<T>): Flow<T> = when (channel) {
is BroadcastChannel<T> -> channel.asFlow()
else -> (channel as? ReceiveChannel<T>)!!.receiveAsFlow()
}
}
}
/**
* A [Flow] of type [T] backed by a state-full conflated broadcast-channel.
*
* This class can be used for state-full emissions of UI-data.
*/
class StateFlow<T> : BaseChannelFlow<T, ConflatedBroadcastChannel<T>>(ConflatedBroadcastChannel()) {
val value: T? get() = channel.valueOrNull
}
/**
* A [Flow] of type [T] backed by a state-less rendezvous channel.
*
* This class can be used for state-less event emissions for UI-navigation, for example.
*/
class EventFlow<T> : BaseChannelFlow<T, Channel<T>>(Channel())
Joffrey
09/12/2020, 9:58 AMSharedFlow
with replay cache of size 1:
https://github.com/Kotlin/kotlinx.coroutines/issues/2034
https://github.com/Kotlin/kotlinx.coroutines/pull/2069gildor
09/13/2020, 12:30 AMJoffrey
09/13/2020, 8:31 AMgildor
09/13/2020, 2:47 PMvalue
propertyJoffrey
09/13/2020, 10:20 PMDROP_OLDEST
(meaning it’s either DROP_LATEST
or SUSPEND
), then you don’t get the “latest state” behaviour even from the consumer side.gildor
09/13/2020, 11:49 PM