simon.vergauwen
03/21/2020, 12:04 PMQueue
works just like any other non-blocking Queue
you might be familiar with.
You can split concerns into Enqueue
and Dequeue
if you want, to split the concern into who is producing events and who is consuming them.
They all work on a 1-offer to 1-taker basis tho, since it removes from the Queue
unless you use the peek
operators. So if you consume an event with take
than it’s also removed from the Queue
, so this work similarly to java.util.concurrent.Queue
or scala.concurrent.Queue
but with a functional non-blocking api in F
with full support for cancelation etc.
We’ll be writing on some additional docs on Queue
and the other new stuff soon, so if there are any specific use-cases you’re trying to solve you cannot solve with the current feature set in 0.10.5 I’d happy to look into them and perhaps include them in some form of documentation.kartoffelsup
03/21/2020, 12:16 PMpakoito
03/21/2020, 12:57 PMpakoito
03/21/2020, 12:58 PMMap<Class<*>, List<Queue>>
kartoffelsup
03/21/2020, 3:21 PMkartoffelsup
03/24/2020, 7:30 PMkartoffelsup
03/24/2020, 8:01 PMpakoito
03/24/2020, 8:28 PMpakoito
03/24/2020, 8:28 PMpakoito
03/24/2020, 8:30 PMtake()
to get resultspakoito
03/24/2020, 8:31 PMpakoito
03/24/2020, 8:31 PMpakoito
03/24/2020, 8:32 PMfun bla() = IO.fx {
val result = !eventBus.listen(Request::class)
!effect { somethingWith(result) }
if (!eventBus.globalStop()) {
!bla()
}
}
kartoffelsup
03/24/2020, 9:54 PMpakoito
03/24/2020, 9:57 PMpakoito
03/24/2020, 9:57 PMpakoito
03/24/2020, 9:58 PMpakoito
03/24/2020, 9:59 PM.attempt().unsafeRunSync()
that’s blocking. We know how to make it non-blocking for our own internal stuff but not when mixed with kotlinx.coroutinespakoito
03/24/2020, 9:59 PMattempt().suspended()
insteadpakoito
03/24/2020, 9:59 PMThread.sleep(15000L)
doesn’t work for either Arrow or kx.coroutines, as it’s also blockingpakoito
03/24/2020, 10:01 PMIO(IO.dispatchers().io()) { }.unsafeRunSync()
pakoito
03/24/2020, 10:02 PMpakoito
03/24/2020, 10:02 PMpakoito
03/24/2020, 10:02 PMpakoito
03/24/2020, 10:02 PMkartoffelsup
03/24/2020, 10:54 PMkartoffelsup
03/25/2020, 6:39 AMTest
Checking for events...
foobar
take
Offer: Event@2cfe9c1a
Event sent.
kartoffelsup
03/25/2020, 6:40 AMkartoffelsup
03/25/2020, 6:52 AMpakoito
03/25/2020, 11:30 AMpakoito
03/25/2020, 11:30 AMpakoito
03/25/2020, 11:30 AMsimon.vergauwen
03/25/2020, 11:35 AMsimon.vergauwen
03/26/2020, 2:05 PMKind<F, Queue<F, Event>>
, but that’s the factory of the Queue
. So you want to store Queue<F, Event>
instead which is the actual Queue
with its state.
Now you were creating a new Queue
before you called take
, so you were offering
and taking
from different `Queue`s.simon.vergauwen
03/26/2020, 2:06 PMsimon.vergauwen
03/26/2020, 2:08 PMprivate val listeners: MutableMap<KClass<*>, MutableList<Queue<F, Event>>> =
Collections.synchronizedMap(mutableMapOf())
and replace it with a Ref
. Why are you storing a list of `Queue`s for a given type? Wouldn’t Ref<Map<KClass<*>, Queue<F, Event>>
be more desired?kartoffelsup
03/26/2020, 5:14 PMsimon.vergauwen
03/26/2020, 5:17 PMAtomicReference
except it has some more powerful operators like update
and modify
which require looping over compareAndSet
if you do it directly over AtomicReference
. It has a MonadDefer<F>
constraint.simon.vergauwen
03/26/2020, 5:18 PMRef
after dinner 😉kartoffelsup
03/26/2020, 5:18 PMsimon.vergauwen
03/26/2020, 5:18 PMsimon.vergauwen
03/26/2020, 6:24 PMIO
but a quite nice EventBus
with Ref
. I was wondering, wouldn’t you want an Unbounded Queue
instead of a List<Queue<F, Event>>
? That way you would have a single Queue
per type, and you could install as many takes
as you want.kartoffelsup
03/26/2020, 7:49 PMkartoffelsup
03/26/2020, 8:12 PMkartoffelsup
03/26/2020, 8:14 PMkartoffelsup
03/26/2020, 9:16 PMpakoito
03/26/2020, 10:30 PMfork
necessary there?simon.vergauwen
03/27/2020, 9:13 AMYes, an unbounded Queue sounds even better, though I only see a bounded companion function on Queue? How do I get to unbounded?Dropping, sliding and unbounded will be released in 0.10.5 in the following days. I’ll take a look later to see if we can refactor it to use
Unbounded
instead 😉simon.vergauwen
03/27/2020, 9:20 AMremind me again, is theThe fork there is sonecessary there?fork
Enqueue
can happen in parallel from Dequeue
.pakoito
03/27/2020, 10:24 AMsimon.vergauwen
03/27/2020, 10:26 AMflatMap
. So there are async jumps but no parallelism.
I should check that piece of code again but raceN
could also be interessing there to launch parallel process and wire the cancellation.pakoito
03/27/2020, 10:27 AMsimon.vergauwen
03/27/2020, 10:28 AMflatMap
and deliverEvents
loops forever, so otherwise it will never reach the effect
that sets up Ktor
.pakoito
03/27/2020, 10:29 AMsimon.vergauwen
03/27/2020, 10:30 AMdeliveryEvents
into a fiber running on the io
pool. It’s should be fine to launch on default
as well tho, there is no blocking IO going on there.kartoffelsup
03/31/2020, 8:21 PM