pakoito
09/14/2022, 3:59 PMval pub = MutableSharedFlow<WorkJob>().apply {
onEach { println("Each") }
.onStart { println("Start") }
.onCompletion { println("Complete") }
.launchIn(GlobalScope)
}
...
somewhere else in code:
pub.tryEmit(WorkJob())
That triggers "start" but doesn't emit any value. The moment I put MutableSharedFlow<Job>(replay = 10)
it works, printing "start" and "each". But I want an infinite stream, not a repetition or something that goes to the latest value.Casey Brooks
09/14/2022, 4:05 PMChannel
instead. You can easily consume the Channel as a Flow
with channel.receiveAsFlow()
for a nicer read API.
MutableSharedFlow
is intended for exactly that, sharing values among multiple subscribers. It’s not designed to replace Channel
as a job queuepakoito
09/14/2022, 4:20 PMSinks.many().unicast().onBackpressureBuffer<Job>()
Casey Brooks
09/14/2022, 4:27 PMMutableSharedFlow
, by default it intentionally drops emissions if there are no subscribers. You can configure it to not necessarily do that, but extraBufferCapacity
is what you’re looking for instead of replay
. That would give you the kind of behaviour you’re looking for.
But ultimately, the decision between MutableSharedFlow
and Channel
comes down to how to need to read those values. 1 value processed exactly once? Use a Channel. 1 value processed by multiple subscribers? Use MutableSharedFlow
. The API for both is similar, it’s a matter of using the right tool for the job, rather than trying to use MutableSharedFlow
when a Channel would work better, just because it’s newerpakoito
09/14/2022, 4:28 PMpakoito
09/14/2022, 4:28 PMpakoito
09/14/2022, 4:28 PMCasey Brooks
09/14/2022, 4:30 PMsimon.vergauwen
09/15/2022, 7:18 AMChannel
for this @pakoito, you can call close
in the same way you'd call onError
or onComplete
for the Subject
.
Using the suspend on overflow settings for Channel
instead of the default buffer one.
I then consumeAsFlow
and start/stop my event loop on using onStart/onCompletion
.
https://github.com/nomisRev/kotlin-kafka/blob/a5bf31fba92038d2b16630731b8a77527f0a[…]/kotlin/io/github/nomisRev/kafka/receiver/internals/PollLoop.kt