I want to share a `Flow` between a fixed number of...
# coroutines
d
I want to share a
Flow
between a fixed number of collectors. Currently I just do
data.shareIn(this, SharingStarted.Lazily)
. Then I let all the collectors colllect that shared flow. However there is a race condition here. In theory the following can happen: • first collector starts collecting, this starts the sharing • the emitter emits things and they are immediately consumed by the already subscribed first collector. • the rest of the collectors start collecting, but they missed out on the first values. I could set
replay
to some number, but that's just a stopgap. Is there a way to say "okay, sharing coroutine, you can start dropping things now, there won't be any further subscribers" after all the subscribers are there?
w
Perhaps you could provide your own implementation of the
SharingStarted
interface, one that only emits
START
command when you tell it to?
3
d
Yes, it seems
Copy code
private fun sharingStartedAfter(nSubs: Int): SharingStarted {
        return object : SharingStarted {
            override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> {
                return subscriptionCount
                    .dropWhile { it < nSubs }
                    .map { SharingCommand.START }
                    .take(1)
            }
        }
    }
should do what I want
👍 1
d
z
A more generalized solution that, I think, is similar in spirit to using Connectable* in Rx, is to just create a MutableSharedFlow and expose that (as a SharedFlow), then just wire it up to your upstream flow whenever your starting conditions are met:
Copy code
private val _shared = MutableSharedFlow<Foo>(…)
val shared: SharedFlow<Foo> get() = _shared

// When you’re ready to start:
scope.launch {
  upstream.collect(_shared)
}
d
(sorry I cannot get this XXX editor to do what I want 😞 )
z
(yea, I feel your pain - had to fight it to get that snippet to format correctly myself)
d
Copy code
return coroutineScope {
            val result = async {
                handle(process.stdout) // this is a shared flow
            }
            val stdErr = async { 
               process.stderr.toList() // this is another shared flow
            }
            val exitValue = process.run() // this suspends and actually runs the process
            if (exitValue != 0) {
                val stdErrString = stdErr.await().joinToString(separator = "\n")
                throw IOException("Failed to run process Exit code $exitValue; $stdErrString")
            }
            result.await()
        }
This is my code for dealing with an external process. However it does not work, because
run
immediately starts emitting into the shared flow and at that point the two async blocks have not run yet and the first value is lost. I can fix this with replay = 1, but thats not what I want. I want to wait until both async blocks have subscribed.
z
Pass
start = CoroutineStart.UNDISPATCHED
to
async
to ensure that initial setup happens synchronously.
d
That works, as long as the first suspension that
handle
does is
collect
z
Yes, which could be brittle. It sounds like the custom SharingStarted is probably a better approach for this, since I think it guarantees synchronization.
d
I dont think that works here, since I am dealing with two flows (stdout and stderr). And it would also mean the process would need to know how many subscribers there are
I am keeping the UNDISPATCHED solution, because it works for my case