Erik
05/01/2024, 9:51 AMMutableSharedFlow#tryEmit
. The flow's first collector might start late and I want to replay early emissions to it (or have it collect them). Though, later collectors should not receive all previous emissions: zero replay. How would I program that?Erik
05/01/2024, 9:56 AMMutableSharedFlow#resetReplayCache
can help... blob thinking upside down
I should call it after the first subscriber starts and again on every event emission? That sounds very inefficient, right?Sam
05/01/2024, 10:17 AMChannel
, and then add the sharing afterwards.
val channel = Channel<T>(Channel.UNLIMITED)
val flow = channel.consumeAsFlow().shareIn(this, SharingStarted.Lazily, replay = 0)
Emit new values using channel.trySend()
. They'll be buffered in the channel until the flow starts doing its thing. The first consumer of flow
will cause the sharing to start, which will consume all the values in the channel's buffer. That will empty the buffer, and after that, new subscribers will only see new values.Erik
05/01/2024, 10:29 AMby lazy
so we get synchronization (at least on JVM. I also target JS/WASM/Android/iOS)Sam
05/01/2024, 10:31 AMSharingStarted.Lazily
, I don't think you should have to worry about synchronized access. Just create the flow once upfront, and it will sit idle until someone starts using it.Erik
05/01/2024, 10:31 AMErik
05/01/2024, 10:33 AMSendChannel#close
, to the publicdarkmoon_uk
05/01/2024, 11:25 AMtrySendBlocking
for the unlimited buffer capacity Channel
. In theory, trySend
would always succeed, and trySendBlocking
would never have cause to block, with an unlimited channel; but if your business logic is such that guaranteed delivery is more important than a Thread being temporarily blocked; you should maybe opt for the -Blocking
form?Erik
05/01/2024, 11:31 AMrunBlocking
under the hood, and that's unavailable on JS!
Furthermore, that's exactly what I first wanted to try:
runBlocking {
while (flow.subsCount == 0) {
delay(10L)
}
}
flow.tryEmit(value)
But that would block the UI for a few (up to a hundred) ms only to wait for the first subscriber to appear. Not very nice.
So I'll decouple the caller and subscribers through that unlimited channel. Thanks all!streetsofboston
05/01/2024, 12:02 PMErik
05/01/2024, 12:03 PMN
(e.g. set it to 0L
): not only the first, but all subscribers collect the existing values.streetsofboston
05/01/2024, 12:03 PMErik
05/01/2024, 12:03 PMstreetsofboston
05/01/2024, 12:04 PMErik
05/01/2024, 12:04 PMonStart
, does it?streetsofboston
05/01/2024, 12:05 PMErik
05/01/2024, 12:05 PMonSubscription
, though that is called a bit laterstreetsofboston
05/01/2024, 12:07 PMErik
05/01/2024, 12:17 PMval channel = Channel<Int>(Channel.UNLIMITED)
val flow = flow {
emitAll(channel)
emitAll(channel.consumeAsFlow())
}
Erik
05/01/2024, 12:18 PMErik
05/01/2024, 12:27 PMval flow = channel.consumeAsFlow()