An easy one: I want a job queue, something that'd ...
# coroutines
p
An easy one: I want a job queue, something that'd be like a PublishSubject in RxJava. I want an infinite stream that suspends if it goes over buffer and never drops values. I'm trying a MutableSharedFlow subscribed since object creation that acts on every new value:
Copy code
val pub = MutableSharedFlow<WorkJob>().apply { 
  onEach { println("Each") }
  .onStart { println("Start") }
  .onCompletion { println("Complete") }
  .launchIn(GlobalScope)
}

...

somewhere else in code:

pub.tryEmit(WorkJob())
That triggers "start" but doesn't emit any value. The moment I put
MutableSharedFlow<Job>(replay = 10)
it works, printing "start" and "each". But I want an infinite stream, not a repetition or something that goes to the latest value.
c
I think you’re looking for a
Channel
instead. You can easily consume the Channel as a
Flow
with
channel.receiveAsFlow()
for a nicer read API.
MutableSharedFlow
is intended for exactly that, sharing values among multiple subscribers. It’s not designed to replace
Channel
as a job queue
p
`Channel`` was failing emissions so I went back to greatest hits with Project Reactor
Copy code
Sinks.many().unicast().onBackpressureBuffer<Job>()
c
that’s the design of
MutableSharedFlow
, by default it intentionally drops emissions if there are no subscribers. You can configure it to not necessarily do that, but
extraBufferCapacity
is what you’re looking for instead of
replay
. That would give you the kind of behaviour you’re looking for. But ultimately, the decision between
MutableSharedFlow
and
Channel
comes down to how to need to read those values. 1 value processed exactly once? Use a Channel. 1 value processed by multiple subscribers? Use
MutableSharedFlow
. The API for both is similar, it’s a matter of using the right tool for the job, rather than trying to use
MutableSharedFlow
when a Channel would work better, just because it’s newer
p
I meant Channel was failing on me 😄
this is for a prototype so it doesn't matter much, Reactor is doing the job
thanks @Casey Brooks
c
Especially on a prototype, it’s probably worth exploring how these differe Coroutine features work, finding and understanding the right set of configuration values for your needs, because they’re ultimately going to work better with other Kotlin code/libraries than the Java-based Reactor will
s
I'm using a plain
Channel
for this @pakoito, you can call
close
in the same way you'd call
onError
or
onComplete
for the
Subject
. Using the suspend on overflow settings for
Channel
instead of the default buffer one. I then
consumeAsFlow
and start/stop my event loop on using
onStart/onCompletion
. https://github.com/nomisRev/kotlin-kafka/blob/a5bf31fba92038d2b16630731b8a77527f0a[…]/kotlin/io/github/nomisRev/kafka/receiver/internals/PollLoop.kt
147 Views