pakoito
09/14/2022, 3:59 PMval pub = MutableSharedFlow<WorkJob>().apply {
onEach { println("Each") }
.onStart { println("Start") }
.onCompletion { println("Complete") }
.launchIn(GlobalScope)
}
...
somewhere else in code:
pub.tryEmit(WorkJob())
That triggers "start" but doesn't emit any value. The moment I put MutableSharedFlow<Job>(replay = 10) it works, printing "start" and "each". But I want an infinite stream, not a repetition or something that goes to the latest value.Casey Brooks
09/14/2022, 4:05 PMChannel instead. You can easily consume the Channel as a Flow with channel.receiveAsFlow() for a nicer read API.
MutableSharedFlow is intended for exactly that, sharing values among multiple subscribers. It’s not designed to replace Channel as a job queuepakoito
09/14/2022, 4:20 PMSinks.many().unicast().onBackpressureBuffer<Job>()Casey Brooks
09/14/2022, 4:27 PMMutableSharedFlow, by default it intentionally drops emissions if there are no subscribers. You can configure it to not necessarily do that, but extraBufferCapacity is what you’re looking for instead of replay. That would give you the kind of behaviour you’re looking for.
But ultimately, the decision between MutableSharedFlow and Channel comes down to how to need to read those values. 1 value processed exactly once? Use a Channel. 1 value processed by multiple subscribers? Use MutableSharedFlow. The API for both is similar, it’s a matter of using the right tool for the job, rather than trying to use MutableSharedFlow when a Channel would work better, just because it’s newerpakoito
09/14/2022, 4:28 PMpakoito
09/14/2022, 4:28 PMpakoito
09/14/2022, 4:28 PMCasey Brooks
09/14/2022, 4:30 PMsimon.vergauwen
09/15/2022, 7:18 AMChannel for this @pakoito, you can call close in the same way you'd call onError or onComplete for the Subject.
Using the suspend on overflow settings for Channel instead of the default buffer one.
I then consumeAsFlow and start/stop my event loop on using onStart/onCompletion.
https://github.com/nomisRev/kotlin-kafka/blob/a5bf31fba92038d2b16630731b8a77527f0a[…]/kotlin/io/github/nomisRev/kafka/receiver/internals/PollLoop.kt