diesieben07
11/17/2020, 2:36 PMFlow
between a fixed number of collectors. Currently I just do data.shareIn(this, SharingStarted.Lazily)
. Then I let all the collectors colllect that shared flow. However there is a race condition here. In theory the following can happen:
• first collector starts collecting, this starts the sharing
• the emitter emits things and they are immediately consumed by the already subscribed first collector.
• the rest of the collectors start collecting, but they missed out on the first values.
I could set replay
to some number, but that's just a stopgap. Is there a way to say "okay, sharing coroutine, you can start dropping things now, there won't be any further subscribers" after all the subscribers are there?wasyl
11/17/2020, 2:50 PMSharingStarted
interface, one that only emits START
command when you tell it to?diesieben07
11/17/2020, 2:52 PMprivate fun sharingStartedAfter(nSubs: Int): SharingStarted {
return object : SharingStarted {
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> {
return subscriptionCount
.dropWhile { it < nSubs }
.map { SharingCommand.START }
.take(1)
}
}
}
should do what I wantDominaezzz
11/17/2020, 3:37 PMZach Klippenstein (he/him) [MOD]
11/17/2020, 3:55 PMprivate val _shared = MutableSharedFlow<Foo>(…)
val shared: SharedFlow<Foo> get() = _shared
// When you’re ready to start:
scope.launch {
upstream.collect(_shared)
}
diesieben07
11/17/2020, 5:39 PMZach Klippenstein (he/him) [MOD]
11/17/2020, 5:40 PMdiesieben07
11/17/2020, 5:42 PMreturn coroutineScope {
val result = async {
handle(process.stdout) // this is a shared flow
}
val stdErr = async {
process.stderr.toList() // this is another shared flow
}
val exitValue = process.run() // this suspends and actually runs the process
if (exitValue != 0) {
val stdErrString = stdErr.await().joinToString(separator = "\n")
throw IOException("Failed to run process Exit code $exitValue; $stdErrString")
}
result.await()
}
This is my code for dealing with an external process. However it does not work, because run
immediately starts emitting into the shared flow and at that point the two async blocks have not run yet and the first value is lost. I can fix this with replay = 1, but thats not what I want. I want to wait until both async blocks have subscribed.Zach Klippenstein (he/him) [MOD]
11/17/2020, 5:51 PMstart = CoroutineStart.UNDISPATCHED
to async
to ensure that initial setup happens synchronously.diesieben07
11/17/2020, 5:53 PMhandle
does is collect
Zach Klippenstein (he/him) [MOD]
11/17/2020, 6:11 PMdiesieben07
11/17/2020, 6:12 PM