In my <#C3PQML5NU|multiplatform> project I want to...
# coroutines
e
In my #multiplatform project I want to emit values into a flow in a non-suspending way. A possible solution is to use
MutableSharedFlow#tryEmit
. The flow's first collector might start late and I want to replay early emissions to it (or have it collect them). Though, later collectors should not receive all previous emissions: zero replay. How would I program that?
1
Maybe
MutableSharedFlow#resetReplayCache
can help... blob thinking upside down I should call it after the first subscriber starts and again on every event emission? That sounds very inefficient, right?
s
I think I'd start with a
Channel
, and then add the sharing afterwards.
Copy code
val channel = Channel<T>(Channel.UNLIMITED)
val flow = channel.consumeAsFlow().shareIn(this, SharingStarted.Lazily, replay = 0)
Emit new values using
channel.trySend()
. They'll be buffered in the channel until the flow starts doing its thing. The first consumer of
flow
will cause the sharing to start, which will consume all the values in the channel's buffer. That will empty the buffer, and after that, new subscribers will only see new values.
e
Thanks Sam! I'll see if I can write some tests for this approach. Maybe the flow can be created
by lazy
so we get synchronization (at least on JVM. I also target JS/WASM/Android/iOS)
s
As long as you use
SharingStarted.Lazily
, I don't think you should have to worry about synchronized access. Just create the flow once upfront, and it will sit idle until someone starts using it.
e
Might not actually be a problem if I implement this channel and flow in some wrapper class
👍 1
I'll probably do that, so that I don't have to expose the channel and shared flow interfaces, like
SendChannel#close
, to the public
d
You might want to consider
trySendBlocking
for the unlimited buffer capacity
Channel
. In theory,
trySend
would always succeed, and
trySendBlocking
would never have cause to block, with an unlimited channel; but if your business logic is such that guaranteed delivery is more important than a Thread being temporarily blocked; you should maybe opt for the
-Blocking
form?
e
Nice suggestion, thanks Chris. Unfortunately, it uses
runBlocking
under the hood, and that's unavailable on JS! Furthermore, that's exactly what I first wanted to try:
Copy code
runBlocking {
    while (flow.subsCount == 0) {
        delay(10L)
    }
}
flow.tryEmit(value)
But that would block the UI for a few (up to a hundred) ms only to wait for the first subscriber to appear. Not very nice. So I'll decouple the caller and subscribers through that unlimited channel. Thanks all!
s
@Sam Is it guaranteed that the first collector gets all the emissions before subsequent collectors? What if the second (and later) collectors start collecting before the first one has finished collecting the initial set of emissions (eg the first collector could be slow)
e
I came up with this: https://pl.kotl.in/dM7X5VjcY Note the issue as parameterised by
N
(e.g. set it to
0L
): not only the first, but all subscribers collect the existing values.
s
Maybe another approach could be implement a callback in a Flows onStart and drain the queue into the first one before continuing
e
But I think I can live with that. The most important feature here is that no value gets lost, but late subscribers only receive new values
s
Ah .. that means that the queue doesn't need to be fully drained before next collectors arrive.
e
I thought about something like that Anton, but I think that an existing queue might not actually start emitting values into
onStart
, does it?
s
I'd test the heck out of it to be sure the behavior is what you want 😁
☝️ 1
e
Likewise with
onSubscription
, though that is called a bit later
s
Correct, onStart doesnt start a flow (that'd be weird and a bit for a chicken and egg issue 🙂). But you can emit extra values within the onStart, which maybe could help in your case.
e
v2, using
Copy code
val channel = Channel<Int>(Channel.UNLIMITED)

    val flow = flow {
        emitAll(channel)
        emitAll(channel.consumeAsFlow())
    }
1
Doesn't do much else than simply
val flow = channel.consumeAsFlow()